Как читать пользовательские даты в формате pyspark - PullRequest
2 голосов
/ 26 апреля 2019

Я хочу использовать spark.read () для извлечения данных из файла .csv, применяя схему. Тем не менее, я не могу получить искры, чтобы признать мои даты как отметки времени.

Сначала я создаю фиктивный файл для тестирования с

%scala
Seq("1|1/15/2019 2:24:00 AM","2|test","3|").toDF().write.text("/tmp/input/csvDateReadTest")

Затем я пытаюсь прочитать его и предоставить строку dateFormat, но он не распознает мои даты и отправляет записи в badRecordsPath

df = spark.read.format('csv')
               .schema("id int, dt timestamp")
               .option("delimiter","|")
               .option("badRecordsPath","/tmp/badRecordsPath")
               .option("dateFormat","M/dd/yyyy hh:mm:ss aaa")
               .load("/tmp/input/csvDateReadTest")

В результате я получаю только 1 запись в df (ID 3), когда я ожидаю увидеть 2. (ID 1 и 3)

df.show()

+---+----+
| id|  dt|
+---+----+
|  3|null|
+---+----+


Ответы [ 2 ]

0 голосов
/ 27 апреля 2019

Вы должны изменить dateFormat на timestampFormat, поскольку вы вводите метку времени, а не дату. Также значение формата отметки времени должно быть mm/dd/yyyy h:mm:ss a.

Пример данных:

Seq(
"1|1/15/2019 2:24:00 AM",
"2|test",
"3|5/30/1981 3:11:00 PM"
).toDF().write.text("/tmp/input/csvDateReadTest")

С изменениями для отметки времени:

val df = spark.read.format("csv")
               .schema("id int, dt timestamp")
               .option("delimiter","|")
               .option("badRecordsPath","/tmp/badRecordsPath")
               .option("timestampFormat","mm/dd/yyyy h:mm:ss a")
               .load("/tmp/input/csvDateReadTest")

А на выходе:

+----+-------------------+
|  id|                 dt|
+----+-------------------+
|   1|2019-01-15 02:24:00|
|   3|1981-01-30 15:11:00|
|null|               null|
+----+-------------------+

Обратите внимание, что запись с идентификатором 2 не соответствует определению схемы и поэтому будет содержать null. Если вы также хотите сохранить недействительные записи, вам нужно изменить столбец отметки времени на строку, и в этом случае на выходе будет:

+---+--------------------+
| id|                  dt|
+---+--------------------+
|  1|1/15/2019 2:24:00 AM|
|  3|5/30/1981 3:11:00 PM|
|  2|                test|
+---+--------------------+

UPDATE:

Чтобы изменить строку dt на тип отметки времени, вы можете попробовать с df.withColumn("dt", $"dt".cast("timestamp")), хотя это не удастся и все значения будут заменены на нуль.

Вы можете достичь этого с помощью следующего кода:

import org.apache.spark.sql.Row
import java.text.SimpleDateFormat
import java.util.{Date, Locale} 
import java.sql.Timestamp
import scala.util.{Try, Success, Failure}

val formatter = new SimpleDateFormat("mm/dd/yyyy h:mm:ss a", Locale.US)
df.map{ case Row(id:Int, dt:String) =>
    val tryParse = Try[Date](formatter.parse(dt))

    val p_timestamp = tryParse match {
        case Success(parsed) => new Timestamp(parsed.getTime())
        case Failure(_) => null
    }

    (id, p_timestamp)
}.toDF("id", "dt").show

Выход:

+---+-------------------+
| id|                 dt|
+---+-------------------+
|  1|2019-01-15 02:24:00|
|  3|1981-01-30 15:11:00|
|  2|               null|
+---+-------------------+
0 голосов
/ 26 апреля 2019

Привет, вот пример кода

df.withColumn("times", 
              from_unixtime(unix_timestamp(col("df"), "M/dd/yyyy hh:mm:ss a"),
              "yyyy-MM-dd HH:mm:ss.SSSSSS"))
  .show(false)
...