Я думаю, что вы можете сделать это с помощью функции window
и некоторых других созданий столбцов с помощью withColumn
.
Код, который я сделал, должен создать отображение для устройств и создать таблицу с продолжительностью длякаждый штат.Единственное требование заключается в том, что подключение и отключение появляются поочередно.
Затем вы можете использовать следующий код:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import datetime
test_df = sqlContext.createDataFrame([(1,datetime.datetime(2019,2,5,8),"connect",1),
(2,datetime.datetime(2019,2,5,8,0,5),"disconnect",1),
(3,datetime.datetime(2019,2,5,8,10),"connect",1),
(4,datetime.datetime(2019,2,5,8,20),"disconnect",1),],
["Id","TimeStamp","Event","DeviceId"])
#creation of dataframe with 4 events for 1 device
test_df.show()
Вывод:
+---+-------------------+----------+--------+
| Id| TimeStamp| Event|DeviceId|
+---+-------------------+----------+--------+
| 1|2019-02-05 08:00:00| connect| 1|
| 2|2019-02-05 08:00:05|disconnect| 1|
| 3|2019-02-05 08:10:00| connect| 1|
| 4|2019-02-05 08:20:00|disconnect| 1|
+---+-------------------+----------+--------+
Затем вы можете создать вспомогательные функции и окно:
my_window = Window.partitionBy("DeviceId").orderBy(col("TimeStamp").desc()) #create window
get_prev_time = lag(col("Timestamp"),1).over(my_window) #get previous timestamp
time_diff = get_prev_time.cast("long") - col("TimeStamp").cast("long") #compute duration
test_df.withColumn("EventDuration",time_diff)\
.withColumn("EndDateTime",get_prev_time)\ #apply the helper functions
.withColumnRenamed("TimeStamp","StartDateTime")\ #rename according to your schema
.withColumn("State",when(col("Event")=="connect", "connected").otherwise("disconnected"))\ #create the state column
.filter(col("EventDuration").isNotNull()).select("Id","StartDateTime","EndDateTime","EventDuration","State","DeviceId").show()
#finally some filtering for the last events, which do not have a previous time
Выход:
+---+-------------------+-------------------+-------------+------------+--------+
| Id| StartDateTime| EndDateTime|EventDuration| State|DeviceId|
+---+-------------------+-------------------+-------------+------------+--------+
| 3|2019-02-05 08:10:00|2019-02-05 08:20:00| 600| connected| 1|
| 2|2019-02-05 08:00:05|2019-02-05 08:10:00| 595|disconnected| 1|
| 1|2019-02-05 08:00:00|2019-02-05 08:00:05| 5| connected| 1|
+---+-------------------+-------------------+-------------+------------+--------+