Чтобы выполнить это преобразование динамически, вам придется перебирать все столбцы и выполнять различные операции в зависимости от типа столбца.
Вот пример:
import java.sql.Date
import org.apache.spark.sql.types._
import java.sql.Timestamp
val originalDf = Seq(
(Timestamp.valueOf("2016-09-30 03:04:00"),Date.valueOf("2016-09-30")),
(Timestamp.valueOf("2016-07-30 00:00:00"),Date.valueOf("2016-10-30"))
).toDF("ts_value","date_value")
Исходные данные таблицы:
> originalDf.show
+-------------------+----------+
| ts_value|date_value|
+-------------------+----------+
|2016-09-30 03:04:00|2016-09-30|
|2016-07-30 00:00:00|2016-10-30|
+-------------------+----------+
> originalDf.printSchema
root
|-- ts_value: timestamp (nullable = true)
|-- date_value: date (nullable = true)
Пример операции преобразования:
val newDf = originalDf.columns.foldLeft(originalDf)((df, name) => {
val data_type = df.schema(name).dataType
if(data_type == DateType)
df.withColumn(name, date_format(col(name), "yyyyMMdd").cast(IntegerType))
else if(data_type == TimestampType)
df.withColumn(name, year(col(name))*10000 + month(col(name))*100 + dayofmonth(col(name)))
else
df
})
Новые данные таблицы:
newDf.show
+--------+----------+
|ts_value|date_value|
+--------+----------+
|20160930| 20160930|
|20160730| 20161030|
+--------+----------+
newDf.printSchema
root
|-- ts_value: integer (nullable = true)
|-- date_value: integer (nullable = true)
Если вы не хотите выполнять эту операцию во всех столбцах, вы можете вручную указать столбцы, изменив
val newDf = originalDf.columns.foldLeft ...
на
val newDf = Seq("col1_name","col2_name", ... ).foldLeft ...
Hopeэто помогает!