Вы должны изменить dateFormat
на timestampFormat
, поскольку вы вводите метку времени, а не дату. Также значение формата отметки времени должно быть mm/dd/yyyy h:mm:ss a
.
Пример данных:
Seq(
"1|1/15/2019 2:24:00 AM",
"2|test",
"3|5/30/1981 3:11:00 PM"
).toDF().write.text("/tmp/input/csvDateReadTest")
С изменениями для отметки времени:
val df = spark.read.format("csv")
.schema("id int, dt timestamp")
.option("delimiter","|")
.option("badRecordsPath","/tmp/badRecordsPath")
.option("timestampFormat","mm/dd/yyyy h:mm:ss a")
.load("/tmp/input/csvDateReadTest")
А на выходе:
+----+-------------------+
| id| dt|
+----+-------------------+
| 1|2019-01-15 02:24:00|
| 3|1981-01-30 15:11:00|
|null| null|
+----+-------------------+
Обратите внимание, что запись с идентификатором 2 не соответствует определению схемы и поэтому будет содержать null
. Если вы также хотите сохранить недействительные записи, вам нужно изменить столбец отметки времени на строку, и в этом случае на выходе будет:
+---+--------------------+
| id| dt|
+---+--------------------+
| 1|1/15/2019 2:24:00 AM|
| 3|5/30/1981 3:11:00 PM|
| 2| test|
+---+--------------------+
UPDATE:
Чтобы изменить строку dt на тип отметки времени, вы можете попробовать с df.withColumn("dt", $"dt".cast("timestamp"))
, хотя это не удастся и все значения будут заменены на нуль.
Вы можете достичь этого с помощью следующего кода:
import org.apache.spark.sql.Row
import java.text.SimpleDateFormat
import java.util.{Date, Locale}
import java.sql.Timestamp
import scala.util.{Try, Success, Failure}
val formatter = new SimpleDateFormat("mm/dd/yyyy h:mm:ss a", Locale.US)
df.map{ case Row(id:Int, dt:String) =>
val tryParse = Try[Date](formatter.parse(dt))
val p_timestamp = tryParse match {
case Success(parsed) => new Timestamp(parsed.getTime())
case Failure(_) => null
}
(id, p_timestamp)
}.toDF("id", "dt").show
Выход:
+---+-------------------+
| id| dt|
+---+-------------------+
| 1|2019-01-15 02:24:00|
| 3|1981-01-30 15:11:00|
| 2| null|
+---+-------------------+