Метка времени Parse Micro / Nano Seconds в spark-csv Считывателе данных: несовместимые результаты - PullRequest
2 голосов
/ 25 октября 2019

Я пытаюсь прочитать CSV-файл, который имеет временные метки до нано секунд. пример содержимого файла TestTimestamp.csv-

spark- 2.4.0, scala - 2.11.11

   /**
     * TestTimestamp.csv -
     * 101,2019-SEP-23 11.42.35.456789123 AM
     *
     */

Пытался прочитать егоusing timestampFormat = "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa"

val dataSchema = StructType(Array(StructField("ID", DoubleType, true), StructField("Created_TS", TimestampType, true)))

val data = spark.read.format("csv")
      .option("header", "false")
      .option("inferSchema", "false")
      .option("treatEmptyValuesAsNulls", "true")
      //.option("nullValue", "")
      .option("dateFormat", "yyyy-MMM-dd")
      .option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa")
      .schema(dataSchema)
      .load("C:\\TestData\\Raw\\TetraPak\\Shipments\\TestTimeStamp.csv")

    data.select('Created_TS).show

Вывод, который я получаю, является совершенно неправильной датой-временем. 23 сентября изменено на 28 сентября

+--------------------+
|          Created_TS|
+--------------------+
|2019-09-28 18:35:...|
+--------------------+

Даже если у меня есть Часы в 24-часовых форматах, таких как - «2019-SEP-23 16.42.35.456789123», и я пытаюсь использовать только первые несколько цифр вторых дробей подавая timestampFormat = "yyyy-MMM-dd HH.mm.ss.SSS"

похожий неверный результат -

val data2 = spark.read.format("csv")
      .option("header", "false")
      .option("inferSchema", "false")
      .option("treatEmptyValuesAsNulls", "true")
      //.option("nullValue", "")
      .option("dateFormat", "yyyy-MMM-dd")
      .option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSS")
      .schema(dataSchema)
      .load("C:\\TestData\\Raw\\TetraPak\\Shipments\\TestTimeStamp.csv")

    data2.select('Created_TS).show

+--------------------+
|          Created_TS|
+--------------------+
|2019-09-28 23:35:...|
+--------------------+

, есть ли способ проанализировать такие строки меток времени при создании фрейм данных с использованием csv reader ?

Ответы [ 2 ]

2 голосов
/ 25 октября 2019

Вот решение, вдохновленное ответом Вернера об использовании udfs ..-

Входной CSV -

101,2019-SEP-23 11.42.35.456789123 AM,2019-SEP-23 11.42.35.456789123 AM,2019-SEP-23 11.42.35.456789123 AM

Исходная схема со столбцами TimestampType

val orig_schema = StructType(Array(StructField("ID", DoubleType, true), StructField("Created_TS", TimestampType, true), StructField("Updated_TS", TimestampType, true), StructField("Modified_TS", TimestampType, true)))

Конвертироватьвсе TimestampType в StringType

val dataSchema = StructType(orig_schema.map(x =>
      {
        x.dataType match {
          case TimestampType => StructField(x.name, StringType, x.nullable)
          case _             => x
        }

      }))

Функция toDate для преобразования строки в Timstamp

//TODO parameterize string formats

    def toDate(date: String): java.sql.Timestamp = {
      val formatter = new DateTimeFormatterBuilder()
        .parseCaseInsensitive()
        .appendPattern("yyyy-MMM-dd hh.mm.ss.SSSSSSSSS a").toFormatter()
      Timestamp.valueOf(LocalDateTime.parse(date, formatter))
    }

// register toDate as udf
val to_timestamp = spark.sqlContext.udf.register("to_timestamp", toDate _)

Создание выражения столбца для выбора из необработанного кадра данных

// Array of Column Name & Types
    val nameType: Array[(String, DataType)] = orig_schema.fields.map(f => (f.name, f.dataType))

// Create Column Expression to select from raw Dataframe
val selectExpr = nameType.map(f => {
      f._2 match {
        case TimestampType => expr(s"CASE WHEN ${f._1} is NULL THEN NULL ELSE to_timestamp(${f._1}) END AS ${f._1}")
        case _             => expr(s"${f._1}")
      }
    })

Чтение как StringType,Используйте выражение селектора столбцов, которое использует udf для преобразования строки в метку времени

val data = spark.read.format("csv")
      .option("header", "false")
      .option("inferSchema", "false")
      .option("treatEmptyValuesAsNulls", "true")
      //.option("nullValue", "")
      .option("dateFormat", "yyyy-MMM-dd")
      .option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa")
      .schema(dataSchema)
.load("C:\\TestData\\Raw\\TetraPak\\Shipments\\TestTimestamp_new.csv").select(selectExpr: _*)

data.show

Вот желаемый вывод ... так что теперь мне не нужно беспокоиться о количестве столбцов и создании выражений с помощью udf вручную

+-----+--------------------+--------------------+--------------------+
|   ID|          Created_TS|          Updated_TS|         Modified_TS|
+-----+--------------------+--------------------+--------------------+
|101.0|2019-09-23 11:42:...|2019-09-23 11:42:...|2019-09-23 11:42:...|
+-----+--------------------+--------------------+--------------------+

1 голос
/ 25 октября 2019

DataFrameReader использует SimpleDateFormat для анализа дат:

timestampFormat (по умолчанию гггг-ММ-дд'Т'ЧЧ: мм: сс. SSSXXX): устанавливает строку, которая указывает формат метки времени. Пользовательские форматы даты следуют форматам в java.text.SimpleDateFormat. Это относится к типу отметки времени.

К сожалению, SimpleDateFormat не поддерживает наносекунды, поэтому часть ваших дат после последней точки будет интерпретироваться как 456789123 миллисекунды, что составляет около 126 часов. Это время добавляется к вашей дате, это объясняет странные результаты, которые вы видите. Более подробную информацию по этой теме можно найти в этом ответе .

Таким образом, даты должны быть проанализированы на втором шаге после чтения csv, например, с помощью udf, который использует DateTimeFormatter :

val dataSchema = StructType(Array(StructField("ID", DoubleType, true), StructField("Created_TS_String", StringType, true)))

var df = spark.read.option("header", false)
  .option("inferSchema", "false")
  .option("treatEmptyValuesAsNulls", "true")
  .schema(dataSchema)
  .csv("C:\\TestData\\Raw\\TetraPak\\Shipments\\TestTimeStamp.csv")

val toDate = udf((date: String) => {
  val formatter = new DateTimeFormatterBuilder()
    .parseCaseInsensitive()
    .appendPattern("yyyy-MMM-dd hh.mm.ss.SSSSSSSSS a").toFormatter()
  Timestamp.valueOf(LocalDateTime.parse(date, formatter))
})

df = df.withColumn("Created_TS", toDate('Created_TS_String))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...