Расчет и агрегирование данных по дате / времени - PullRequest
0 голосов
/ 05 февраля 2019

Я работаю с таким фреймом данных:

Id     | TimeStamp         | Event     |  DeviceId
1      | 5.2.2019 8:00:00  | connect   |  1
2      | 5.2.2019 8:00:05  | disconnect|  1

Я использую блоки данных и pyspark для выполнения процесса ETL.Как я могу рассчитать и создать такой кадр данных, как показано внизу?Я уже пытался использовать UDF, но я не мог найти способ заставить его работать.Я также пытался сделать это, перебирая весь фрейм данных, но это очень медленно.

Я хочу объединить этот фрейм данных, чтобы получить новый фрейм данных, который сообщает мне время, сколько времени было подключено каждое устройствои отключен:

Id     | StartDateTime   | EndDateTime   | EventDuration  |State    |  DeviceId
1      | 5.2.19 8:00:00  | 5.2.19 8:00:05| 0.00:00:05     |connected|  1

1 Ответ

0 голосов
/ 05 февраля 2019

Я думаю, что вы можете сделать это с помощью функции window и некоторых других созданий столбцов с помощью withColumn.

Код, который я сделал, должен создать отображение для устройств и создать таблицу с продолжительностью длякаждый штат.Единственное требование заключается в том, что подключение и отключение появляются поочередно.

Затем вы можете использовать следующий код:

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import datetime
test_df = sqlContext.createDataFrame([(1,datetime.datetime(2019,2,5,8),"connect",1),
(2,datetime.datetime(2019,2,5,8,0,5),"disconnect",1),
(3,datetime.datetime(2019,2,5,8,10),"connect",1),
(4,datetime.datetime(2019,2,5,8,20),"disconnect",1),], 
["Id","TimeStamp","Event","DeviceId"])    
#creation of dataframe with 4 events for 1 device
test_df.show()

Вывод:

+---+-------------------+----------+--------+
| Id|          TimeStamp|     Event|DeviceId|
+---+-------------------+----------+--------+
|  1|2019-02-05 08:00:00|   connect|       1|
|  2|2019-02-05 08:00:05|disconnect|       1|
|  3|2019-02-05 08:10:00|   connect|       1|
|  4|2019-02-05 08:20:00|disconnect|       1|
+---+-------------------+----------+--------+

Затем вы можете создать вспомогательные функции и окно:

my_window = Window.partitionBy("DeviceId").orderBy(col("TimeStamp").desc()) #create window
get_prev_time = lag(col("Timestamp"),1).over(my_window)                     #get previous timestamp
time_diff = get_prev_time.cast("long") - col("TimeStamp").cast("long")      #compute duration

test_df.withColumn("EventDuration",time_diff)\
.withColumn("EndDateTime",get_prev_time)\           #apply the helper functions
.withColumnRenamed("TimeStamp","StartDateTime")\    #rename according to your schema
.withColumn("State",when(col("Event")=="connect", "connected").otherwise("disconnected"))\ #create the state column 
.filter(col("EventDuration").isNotNull()).select("Id","StartDateTime","EndDateTime","EventDuration","State","DeviceId").show()
#finally some filtering for the last events, which do not have a previous time

Выход:

+---+-------------------+-------------------+-------------+------------+--------+
| Id|      StartDateTime|        EndDateTime|EventDuration|       State|DeviceId|
+---+-------------------+-------------------+-------------+------------+--------+
|  3|2019-02-05 08:10:00|2019-02-05 08:20:00|          600|   connected|       1|
|  2|2019-02-05 08:00:05|2019-02-05 08:10:00|          595|disconnected|       1|
|  1|2019-02-05 08:00:00|2019-02-05 08:00:05|            5|   connected|       1|
+---+-------------------+-------------------+-------------+------------+--------+
...