Хорошо, давайте сделаем это методом try-catch. Попробуйте преобразовать столбцы для каждого формата и сохраните значение успеха. Возможно, вам придется предоставить весь возможный формат извне в качестве параметра или сохранить основной список всех возможных форматов где-то в самом коде.
Вот возможное решение .. (Вместо SimpleDateFormatter, который иногда имеет проблемы с отметками времени за миллисекунды я использую новую библиотеку - java .time.format.DateTimeFormatter)
Создание функции to_timestamp, которая принимает строку для преобразования в метку времени и все возможные форматы
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.format.DateTimeFormatter
import scala.util.Try
def toTimestamp(date: String, tsformats: Seq[String]): Option[java.sql.Timestamp] = {
val out = (for (tsft <- tsformats) yield {
val formatter = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendPattern(tsft).toFormatter()
if (Try(java.sql.Timestamp.valueOf(LocalDateTime.parse(date, formatter))).isSuccess)
Option(java.sql.Timestamp.valueOf(LocalDateTime.parse(date, formatter)))
else None
}).filter(_.isDefined)
if (out.isEmpty) None else out.head
}
Создать UDF поверх него - (этот udf принимает в качестве параметра строки Seq of Format)
def UtoTimestamp(tsformats: Seq[String]) = org.apache.spark.sql.functions.udf((date: String) => toTimestamp(date, tsformats))
А теперь просто используйте его в своем искровом коде. Вот тест с вашими данными -
val DF = Seq(("02-04-2020 08:02"), ("03-04-2020 10:02"), ("04-04-2020 09:00"), ("04/13/19 9:12"), ("04/14/19 2:13"), ("04/15/19 10:14"), ("04/16/19 5:15")).toDF("DOB")
val tsformats = Seq("MM-dd-yyyy HH:mm", "MM/dd/yy H:mm")
DF.select(UtoTimestamp(tsformats)('DOB)).show
А вот вывод -
+-------------------+
| UDF(DOB)|
+-------------------+
|2020-02-04 08:02:00|
|2020-03-04 10:02:00|
|2020-04-04 09:00:00|
|2019-04-13 09:12:00|
|2019-04-14 02:13:00|
|2019-04-15 10:14:00|
|2019-04-16 05:15:00|
+-------------------+
Cherry наверху позволит избежать записи UtoTimestamp (colname) для многих столбцов в вашем фрейме данных. Давайте напишем функцию, которая принимает Dataframe, список всех столбцов Timestamp и все возможные форматы, в которых ваши исходные данные могут иметь закодированные временные метки в ..
Он будет анализировать все столбцы timestamp для вас, пытаясь сопоставить форматы. .
def WithTimestampParsed(df: DataFrame, tsCols: Seq[String], tsformats: Seq[String]): DataFrame = {
val colSelector = df.columns.map {
c =>
{
if (tsCols.contains(c)) UtoTimestamp(tsformats)(col(c)) alias (c)
else col(c)
}
}
Используйте это так -
// You can pass as many column names in a sequence to be parsed
WithTimestampParsed(DF, Seq("DOB"), tsformats).show
Вывод -
+-------------------+
| DOB|
+-------------------+
|2020-02-04 08:02:00|
|2020-03-04 10:02:00|
|2020-04-04 09:00:00|
|2019-04-13 09:12:00|
|2019-04-14 02:13:00|
|2019-04-15 10:14:00|
|2019-04-16 05:15:00|
+-------------------+