Spark: анализ даты / времени с разными форматами (ММ-дд-гггг ЧЧ: мм, ММ / дд / гг Чч: мм) в одном и том же столбце кадра данных - PullRequest
1 голос
/ 02 апреля 2020

Проблема в том, что у меня есть набор данных, где столбец имеет 2 или более типов формата даты. Обычно я выбираю все значения в качестве типа String, а затем использую to_date для анализа даты. Но я не знаю, как мне разобрать столбец с двумя или более типами форматов даты.

val DF= Seq(("02-04-2020 08:02"),("03-04-2020 10:02"),("04-04-2020 09:00"),("04/13/19 9:12"),("04/14/19 2:13"),("04/15/19 10:14"), ("04/16/19 5:15")).toDF("DOB")

import org.apache.spark.sql.functions.{to_date, to_timestamp}
val DOBDF = DF.withColumn("Date", to_date($"DOB", "MM/dd/yyyy"))

Вывод команды выше:

null
null
null
0019-04-13
0019-04-14
0019-04-15
0019-04-16

Код выше у меня есть записано не работает для формата MM/dd/yyyy и формата, который не предусматривал этого, я получаю null в качестве вывода.

Поэтому обращаюсь за помощью для анализа файла с различными форматами даты. Если возможно, пожалуйста, поделитесь некоторыми учебниками или примечаниями к сделке с форматами даты. Обратите внимание: я использую Scala для каркаса искры.

Заранее спасибо.

Ответы [ 2 ]

2 голосов
/ 02 апреля 2020

Хорошо, давайте сделаем это методом try-catch. Попробуйте преобразовать столбцы для каждого формата и сохраните значение успеха. Возможно, вам придется предоставить весь возможный формат извне в качестве параметра или сохранить основной список всех возможных форматов где-то в самом коде.

Вот возможное решение .. (Вместо SimpleDateFormatter, который иногда имеет проблемы с отметками времени за миллисекунды я использую новую библиотеку - java .time.format.DateTimeFormatter)

Создание функции to_timestamp, которая принимает строку для преобразования в метку времени и все возможные форматы

  import java.time.LocalDate
  import java.time.LocalDateTime
  import java.time.LocalTime
  import java.time.format.DateTimeFormatter
  import scala.util.Try

def toTimestamp(date: String, tsformats: Seq[String]): Option[java.sql.Timestamp] = {

    val out = (for (tsft <- tsformats) yield {
      val formatter = new DateTimeFormatterBuilder()
        .parseCaseInsensitive()
        .appendPattern(tsft).toFormatter()
      if (Try(java.sql.Timestamp.valueOf(LocalDateTime.parse(date, formatter))).isSuccess)
        Option(java.sql.Timestamp.valueOf(LocalDateTime.parse(date, formatter)))
      else None

    }).filter(_.isDefined)
    if (out.isEmpty) None else out.head
  }

Создать UDF поверх него - (этот udf принимает в качестве параметра строки Seq of Format)

 def UtoTimestamp(tsformats: Seq[String]) = org.apache.spark.sql.functions.udf((date: String) => toTimestamp(date, tsformats))

А теперь просто используйте его в своем искровом коде. Вот тест с вашими данными -

    val DF = Seq(("02-04-2020 08:02"), ("03-04-2020 10:02"), ("04-04-2020 09:00"), ("04/13/19 9:12"), ("04/14/19 2:13"), ("04/15/19 10:14"), ("04/16/19 5:15")).toDF("DOB")

    val tsformats = Seq("MM-dd-yyyy HH:mm", "MM/dd/yy H:mm")

    DF.select(UtoTimestamp(tsformats)('DOB)).show

А вот вывод -

+-------------------+
|           UDF(DOB)|
+-------------------+
|2020-02-04 08:02:00|
|2020-03-04 10:02:00|
|2020-04-04 09:00:00|
|2019-04-13 09:12:00|
|2019-04-14 02:13:00|
|2019-04-15 10:14:00|
|2019-04-16 05:15:00|
+-------------------+


Cherry наверху позволит избежать записи UtoTimestamp (colname) для многих столбцов в вашем фрейме данных. Давайте напишем функцию, которая принимает Dataframe, список всех столбцов Timestamp и все возможные форматы, в которых ваши исходные данные могут иметь закодированные временные метки в ..

Он будет анализировать все столбцы timestamp для вас, пытаясь сопоставить форматы. .

def WithTimestampParsed(df: DataFrame, tsCols: Seq[String], tsformats: Seq[String]): DataFrame = {

    val colSelector = df.columns.map {
      c =>
        {
          if (tsCols.contains(c)) UtoTimestamp(tsformats)(col(c)) alias (c)
          else col(c)
        }
    }

Используйте это так -

// You can pass as many column names in a sequence to be parsed
WithTimestampParsed(DF, Seq("DOB"), tsformats).show

Вывод -

+-------------------+
|                DOB|
+-------------------+
|2020-02-04 08:02:00|
|2020-03-04 10:02:00|
|2020-04-04 09:00:00|
|2019-04-13 09:12:00|
|2019-04-14 02:13:00|
|2019-04-15 10:14:00|
|2019-04-16 05:15:00|
+-------------------+
0 голосов
/ 02 апреля 2020

Я поместил код, который может вам чем-то помочь. Я пробовал это

mport org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import java.sql.Date
import java.util.{GregorianCalendar}


object DateFormats {

  val spark = SparkSession
    .builder()
    .appName("Multiline")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id", "Multiline")  // To silence Metrics warning
    .getOrCreate()

  val sc = spark.sparkContext

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)


    try {

      import spark.implicits._

      val DF = Seq(("02-04-2020 08:02"),("03-04-2020 10:02"),("04-04-2020 09:00"),("04/13/19 9:12"),("04/14/19 2:13"),("04/15/19 10:14"), ("04/16/19 5:15")).toDF("DOB")

      import org.apache.spark.sql.functions.{to_date, to_timestamp}
      val DOBDF = DF.withColumn("Date", to_date($"DOB", "MM/dd/yyyy"))

      DOBDF.show()

      // todo: my code below
      DF
        .rdd
        .map(r =>{
            if(r.toString.contains("-")) {
              val dat = r.toString.substring(1,11).split("-")
              val calendar = new GregorianCalendar(dat(2).toInt,dat(1).toInt - 1,dat(0).toInt)
              (r.toString, new Date(calendar.getTimeInMillis))
            } else {
              val dat = r.toString.substring(1,9).split("/")
              val calendar = new GregorianCalendar(dat(2).toInt + 2000,dat(0).toInt - 1,dat(1).toInt)
              (r.toString, new Date(calendar.getTimeInMillis))
            }

        })
        .toDF("DOB","DATE")
        .show()

      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped.")
      spark.stop()
      println("SparkSession stopped.")
    }
  }
}
+------------------+----------+
|               DOB|      DATE|
+------------------+----------+
|[02-04-2020 08:02]|2020-04-02|
|[03-04-2020 10:02]|2020-04-03|
|[04-04-2020 09:00]|2020-04-04|
|   [04/13/19 9:12]|2019-04-13|
|   [04/14/19 2:13]|2019-04-14|
|  [04/15/19 10:14]|2019-04-15|
|   [04/16/19 5:15]|2019-04-16|
+------------------+----------+

С уважением

...