Я новичок в PySpark. У меня есть таблица в SQL Server df
следующим образом:
DeviceID TimeStamp A B C
00234 11-03-2014 05:55 5.6 2.3 3.3
00235 11-03-2014 05:33 2.8 0.9 4.2
00236 11-03-2014 06:15 3.5 0.1 1.3
00234 11-03-2014 07:23 2.5 0.2 3.9
00236 11-03-2014 07:33 2.5 4.5 2.9
Цель / Что я хочу:
Найти max
значений каждого DeviceID
и их соответствующих TimeStamp
. Кроме того, мне также нужно иметь текущую метку времени, чтобы ежедневно я знал, когда значение max
действительно для каждого DeviceID
.
Таким образом, результирующий df_final
должен быть похож на
DeviceID Max_Value TimeStamp Curr_TimeStamp
00234 5.6 11-03-2014 05:55 11-03-2014 23:54
00236 4.5 11-03-2014 07:33 11-03-2014 23:54
Для достижения вышеуказанного df_final
я использовал функцию Window
. Ниже приведены мои фрагменты кода.
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql import SparkSession
##Initialize Spark Session##
spark = SparkSession.builder.appName("test").config("spark.driver.extraClassPath", "/path/to/sqljdbc-6.4.jar").getOrCreate()
##Fetch data from SQL Server table df ##
df = spark.read.format("jdbc").options(url="SQL Server details",properties =
{ "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
},dbtable="df").load()
##Create a Temp View for further processing##
df.createOrReplaceTempView("df_temp")
##Get only a days data##
df_view = spark.sql("select * from df_temp where TimeStamp between date_add(current_date(),-1) and current_date()")
#Finally creating the dataframe df_final as required##
w = Window.partitionBy('DeviceImei')
df_final = df_view.select('DeviceImei','DeviceTimeStamp',F.greatest('IL1','IL2','IL3').alias('max_value'))
df_final = df_final.withColumn('Max-TimeStamp',F.max('max_value').over(w)).where(F.col('max_value') == F.col('Max-TimeStamp')).drop('Max-TimeStamp').withColumn('TimeStamp',F.current_timestamp())
Пока все хорошо !! Однако странная вещь происходит, когда я экспортирую это в другую таблицу SQL Server.
df_final.write.jdbc(url="SQL Server details", table="MaxLoad", mode="append", properties={ "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver" })
Что я получаю следующим образом:
DeviceID Max_Value TimeStamp Curr_TimeStamp
00234 5.6 10-03-2014 10:55 11-03-2014 23:54
00236 4.5 10-03-2014 12:33 11-03-2014 23:54
Как видите, значения TimeStamp
были изменены !!
Почему это происходит? Я что-то пропустил в коде? Я проверил метку системного времени на компьютере с Spark и SQL-сервером, и они идеальны.
Любая помощь будет оценена.
P.S: Spark 2.4.1 работает на CentOS 7, и я использую SQL Server 2014 на компьютере с Windows Server 2008 R2