не допустить смещения временной метки искры в спарк-оболочке - PullRequest
0 голосов
/ 09 октября 2019

Я запустил оболочку искры, используя:

spark-shell --conf spark.sql.session.timeZone=utc

При выполнении приведенного ниже примера результатом является отметка времени в столбце utc_shifted, которая перемещается. Он не содержит желаемый результат UDF, но что-то еще. Чтобы быть конкретным: вход UTC и искра сдвигает его снова. Как это поведение может быть исправлено?

+-----------------+-----------------------+-------------------+
|value            |utc_shifted            |fitting            |
+-----------------+-----------------------+-------------------+
|20191009145901202|2019-10-09 12:59:01.202|2019-10-09 14:59:01|
|20191009145514816|2019-10-09 12:55:14.816|2019-10-09 14:55:14|
+-----------------+-----------------------+-------------------+

Похоже, что пропуск параметра часового пояса по умолчанию исправит это, но я не уверен, может ли один из исполнителей иметь другой / неправильный часовой пояс, который я все еще получаюправильный результат. Поэтому я предпочитаю устанавливать его. Почему это не влияет на синтаксический анализ собственной метки времени? Как я могу получить аналогичное поведение для моей UDF?

Воспроизводимый пример:

val input = Seq("20191009145901202", "20191009145514816").toDF

import scala.util.{Failure, Success, Try}
import java.sql.Timestamp
import java.text.SimpleDateFormat
import org.apache.spark.sql.DataFrame

def parseTimestampWithMillis(
      timestampColumnInput: String,
      timestampColumnOutput: String,
      formatString: String)(df: DataFrame): DataFrame = {
    def getTimestamp(s: String): Option[Timestamp] = {
      if (s.isEmpty) {
        None
      } else {
        val format = new SimpleDateFormat(formatString)
        Try(new Timestamp(format.parse(s).getTime)) match {
          case Success(t) => {
            println(s"input: ${s}, output: ${t}")
            Some(t)
          }
          case Failure(_) => None
        }
      }
    }

    val getTimestampUDF = udf(getTimestamp _)
    df.withColumn(
      timestampColumnOutput, getTimestampUDF(col(timestampColumnInput)))
  }


input.transform(parseTimestampWithMillis("value", "utc_shifted", "yyyyMMddHHmmssSSS")).withColumn("fitting", to_timestamp(col("value"), "yyyyMMddHHmmssSSS")).show(false)

+-----------------+-----------------------+-------------------+
|value            |utc_shifted            |fitting            |
+-----------------+-----------------------+-------------------+
|20191009145901202|2019-10-09 12:59:01.202|2019-10-09 14:59:01|
|20191009145514816|2019-10-09 12:55:14.816|2019-10-09 14:55:14|
+-----------------+-----------------------+-------------------+

На самом деле этот параметр влияет не только на отображение, но и на вывод при записи в файл.

edit

По сути, я явно установил часовой пояс, как предложено в Spark Strutured Streaming автоматически преобразует метку времени в местное время , но получаю результат, который я считаю неверным.

1 Ответ

0 голосов
/ 09 октября 2019
spark-shell --conf spark.sql.session.timeZone=UTC --conf "spark.driver.extraJavaOptions=-Duser.timezone=UTC" --conf "spark.executor.extraJavaOptions=-Duser.timezone=UTC"

, кажется, дает мне желаемый результат.

Но это не объясняет, почему это применимо только к моему UDF, а не к искровой внутренней функции to_timestamp.

...