Подобные ошибки возникают, когда вы пытаетесь применить схему к RDD[Row]
, используя API разработчика функции:
def createDataFrame(rows: List[Row], schema: StructType): DataFrame
def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
В таких случаях хранимые типы данных должны соответствовать внешним (т.е. Тип значения в Scala ) типы данных , перечисленные в официальном SQL , и не применяется приведение типов или принуждение.
Поэтому вы, как пользователь, обязаны обеспечитьчто дата и схема совместимы.
Описание предоставленной вами проблемы указывает на довольно другой сценарий, который запрашивает CAST
.Давайте создадим набор данных с точно такой же схемой, как в вашем примере:
val yearDF = spark.createDataFrame(
sc.parallelize(Seq[Row]()),
StructType(Seq(
StructField("source_system_name", StringType),
StructField("table_refresh_delay_min", DecimalType(38, 30)),
StructField("release_number", DecimalType(38, 30)),
StructField("change_number", DecimalType(38, 30)),
StructField("interface_queue_enabled_flag", StringType),
StructField("rework_enabled_flag", StringType),
StructField("fdm_application_id", DecimalType(15, 0)),
StructField("history_enabled_flag", StringType)
)))
yearDF.printSchema
root
|-- source_system_name: string (nullable = true)
|-- table_refresh_delay_min: decimal(38,30) (nullable = true)
|-- release_number: decimal(38,30) (nullable = true)
|-- change_number: decimal(38,30) (nullable = true)
|-- interface_queue_enabled_flag: string (nullable = true)
|-- rework_enabled_flag: string (nullable = true)
|-- fdm_application_id: decimal(15,0) (nullable = true)
|-- history_enabled_flag: string (nullable = true)
и желаемыми типами, такими как
val dtypes = Seq(
"source_system_name" -> "string",
"table_refresh_delay_min" -> "double",
"release_number" -> "double",
"change_number" -> "double",
"interface_queue_enabled_flag" -> "string",
"rework_enabled_flag" -> "string",
"fdm_application_id" -> "long",
"history_enabled_flag" -> "string"
)
, тогда вы можете просто отобразить:
val mapping = dtypes.toMap
yearDF.select(yearDF.columns.map { c => col(c).cast(mapping(c)) }: _*).printSchema
root
|-- source_system_name: string (nullable = true)
|-- table_refresh_delay_min: double (nullable = true)
|-- release_number: double (nullable = true)
|-- change_number: double (nullable = true)
|-- interface_queue_enabled_flag: string (nullable = true)
|-- rework_enabled_flag: string (nullable = true)
|-- fdm_application_id: long (nullable = true)
|-- history_enabled_flag: string (nullable = true)
Это, конечно, предполагает, что фактический и требуемый типы совместимы, и CAST
разрешено .
Если у вас все еще возникают проблемы из-за особенностей конкретного драйвера JDBC, вам следует рассмотреть возможность размещения приведения непосредственно в запросе, либо вручную ( В Apache Spark 2.0.0, возможно ли получить запрос из внешней базы данных (а не получить всю таблицу)? )
val externalDtypes = Seq(
"source_system_name" -> "text",
"table_refresh_delay_min" -> "double precision",
"release_number" -> "float8",
"change_number" -> "float8",
"interface_queue_enabled_flag" -> "string",
"rework_enabled_flag" -> "string",
"fdm_application_id" -> "bigint",
"history_enabled_flag" -> "string"
)
val externalDtypes = dtypes.map {
case (c, t) => s"CAST(`$c` AS $t)"
} .mkString(", ")
val dbTable = s"""(select $fields from schema.tablename) as tmp"""
или через пользовательскую схему:
spark.read
.format("jdbc")
.option(
"customSchema",
dtypes.map { case (c, t) => s"`$c` $t" } .mkString(", "))
...
.load()