Если я правильно понимаю ваше требование, следующий пример иллюстрирует, как вернуть DataFrame с измененными типами столбцов к его исходной версии:
import org.apache.spark.sql.types._
val df1 = Seq(
(1, "a", 100L, 10.0), (2, "b", 200L, 20.0)
).toDF("c1", "c2", "c3", "c4")
val df2 = Seq(
(1, "a", 100, 10.0f), (2, "b", 200, 20.0f)
).toDF("c1", "c2", "c3", "c4")
df2.printSchema
// root
// |-- c1: integer (nullable = false)
// |-- c2: string (nullable = true)
// |-- c3: integer (nullable = false)
// |-- c4: float (nullable = false)
val fieldsDiffType = (df1.schema.fields zip df2.schema.fields).collect{
case (a: StructField, b: StructField) if a.dataType != b.dataType =>
(a.name, a.dataType)
}
// fieldsDiffType: Array[(String, org.apache.spark.sql.types.DataType)] =
// Array((c3,LongType), (c4,DoubleType))
val df2To1 = fieldsDiffType.foldLeft(df2)( (accDF, field) =>
accDF.withColumn(field._1, col(field._1).cast(field._2))
)
df2To1.printSchema
// root
// |-- c1: integer (nullable = false)
// |-- c2: string (nullable = true)
// |-- c3: long (nullable = false)
// |-- c4: double (nullable = false)
Обратите внимание, что это решение работает, только если столбцы DataFrame остаются прежнимипо размеру и порядку, и не распространяется на такие типы, как массив или структура.
[ОБНОВЛЕНИЕ]
Если есть опасения, что порядок столбцов может быть изменен, вы можете сначала заказать df1.schema.fields
иdf2.schema.fields
перед выполнением zip
:
df1.schema.fields.sortBy(_.name) zip df2.schema.fields.sortBy(_.name)