PySpark: метка времени изменяется при экспорте в SQL Server - PullRequest
0 голосов
/ 14 мая 2019

Я новичок в PySpark. У меня есть таблица в SQL Server df следующим образом:

DeviceID       TimeStamp            A      B     C
 00234       11-03-2014 05:55      5.6    2.3   3.3
 00235       11-03-2014 05:33      2.8    0.9   4.2
 00236       11-03-2014 06:15      3.5    0.1   1.3
 00234       11-03-2014 07:23      2.5    0.2   3.9
 00236       11-03-2014 07:33      2.5    4.5   2.9

Цель / Что я хочу: Найти max значений каждого DeviceID и их соответствующих TimeStamp. Кроме того, мне также нужно иметь текущую метку времени, чтобы ежедневно я знал, когда значение max действительно для каждого DeviceID.

Таким образом, результирующий df_final должен быть похож на

DeviceID    Max_Value       TimeStamp           Curr_TimeStamp
00234          5.6        11-03-2014 05:55     11-03-2014 23:54
00236          4.5        11-03-2014 07:33     11-03-2014 23:54

Для достижения вышеуказанного df_final я использовал функцию Window. Ниже приведены мои фрагменты кода.

import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql import SparkSession

##Initialize Spark Session##
spark = SparkSession.builder.appName("test").config("spark.driver.extraClassPath", "/path/to/sqljdbc-6.4.jar").getOrCreate()
##Fetch data from SQL Server table df ##
df = spark.read.format("jdbc").options(url="SQL Server details",properties = 
{ "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver" 
},dbtable="df").load()

##Create a Temp View for further processing##
df.createOrReplaceTempView("df_temp")

##Get only a days data##
df_view = spark.sql("select * from df_temp where TimeStamp between date_add(current_date(),-1) and current_date()")

#Finally creating the dataframe df_final as required##
w = Window.partitionBy('DeviceImei')
df_final = df_view.select('DeviceImei','DeviceTimeStamp',F.greatest('IL1','IL2','IL3').alias('max_value'))
df_final = df_final.withColumn('Max-TimeStamp',F.max('max_value').over(w)).where(F.col('max_value') == F.col('Max-TimeStamp')).drop('Max-TimeStamp').withColumn('TimeStamp',F.current_timestamp())

Пока все хорошо !! Однако странная вещь происходит, когда я экспортирую это в другую таблицу SQL Server.

df_final.write.jdbc(url="SQL Server details", table="MaxLoad", mode="append", properties={ "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver" })

Что я получаю следующим образом:

DeviceID    Max_Value         TimeStamp         Curr_TimeStamp
00234          5.6        10-03-2014 10:55     11-03-2014 23:54
00236          4.5        10-03-2014 12:33     11-03-2014 23:54

Как видите, значения TimeStamp были изменены !!

Почему это происходит? Я что-то пропустил в коде? Я проверил метку системного времени на компьютере с Spark и SQL-сервером, и они идеальны.

Любая помощь будет оценена.

P.S: Spark 2.4.1 работает на CentOS 7, и я использую SQL Server 2014 на компьютере с Windows Server 2008 R2

1 Ответ

1 голос
/ 14 мая 2019

Бьюсь об заклад, ваш Spark Cluster и MS SQL Server расположены в разных часовых поясах. Я испытал это, и решение было бы использовать UTC TZ, установив conf spark.conf.set("spark.sql.session.timeZone", "Etc/UTC"). Устанавливая этот параметр, ваши метки времени должны давать вам то, что вы ожидаете, если сохранитесь на MS SQL Server.

Отказ от ответственности: Я думаю, что установка конф. Spark TZ в UTC решит вашу проблему, однако ваш формат отметки времени может также вызывать проблемы ... рекомендуемый формат Java - гггг-ММ-дд ЧЧ: мм: сс

Вот пример того, как одна из ваших временных меток ведет себя по-разному в разных часовых поясах

spark.version
'2.4.3'

from pyspark.sql.functions import *

# you can check Spark Cluster TZ like this
spark.conf.get("spark.sql.session.timeZone")
"will list your server tz here"

# change to UTC to fix problem / preserve event time source data timestamp
spark.conf.set("spark.sql.session.timeZone", "Etc/UTC")

# let's take one of your timestamp and convert to unix for testing
ut = spark.createDataFrame([('11-03-2014 05:55',)], ['ut'])
ut.select(unix_timestamp('ut', 'MM-dd-yyyy HH:mm').alias('ut')).show()

+----------+
|        ut|
+----------+
|1414994100|
+----------+

# let's test the output with a system set at LA TZ to see the timestamp changes
spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")

la_time = spark.createDataFrame([(1414994100,)], ['la_tz'])
la_time.select(from_unixtime('la_tz').alias('la_tz')).show() # different ts as source

+-------------------+
|              la_tz|
+-------------------+
|2014-11-02 21:55:00|
+-------------------+

# set TZ back to UTC to confirm timestamp has preserved source data event time
spark.conf.set("spark.sql.session.timeZone", "Etc/UTC")

utc = spark.createDataFrame([(1414994100,)], ['utc_tz'])
utc.select(from_unixtime('utc_tz').alias('utc_tz')).show() # same ts as source

+-------------------+
|             utc_tz|
+-------------------+
|2014-11-03 05:55:00|
+-------------------+

# reset TZ conf if you want
spark.conf.unset("spark.sql.session.timeZone")

# if you want to change your timestamp format
ts = spark.createDataFrame([('11-03-2014 05:55',)], ['ts'])
ts.select(to_timestamp('ts', 'MM-dd-yyyy HH:mm').alias('ts')).show()

+-------------------+
|                 ts|
+-------------------+
|2014-11-03 05:55:00|
+-------------------+

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...