разбить на 24 часа и агрегировать с помощью pyspark или panda - PullRequest
2 голосов
/ 21 апреля 2020

У меня есть данные сеанса для каждого устройства, как показано ниже

время начала: отметка времени, когда устройство подключено

длительность (секунды): сколько времени оно подключено

пакеты: сколько отправленных пакетов

теперь мне нужно агрегировать (суммировать) продолжительность и пакет для каждого раздела устройства по 24 часам.

например:

Для первой записи,

Устройство A, запущенное по времени 8 апреля 1:53, поэтому необходимо объединить все устройства (A), действительные в течение 24 часов, то есть до 9 апреля 1:53.

Затем следующий запись для устройства А должна начинаться с 9 апреля 4:27, поэтому необходимо объединить все устройства (А), действительные в течение 24 часов, то есть до 10 апреля 4:27.

и

Так для каждого устройства.

enter image description here

ожидаемый выход

enter image description here

Тестовые данные:

dftest = sc.parallelize([['A','2020-04-08T01:53:54.932000','Org1','wifi',60,372717],
                      ['A','2020-04-08T02:40:38.661000','Org1','wifi',194,819040],
                       ['A','2020-04-08T21:45:10.207000','Org1','wifi',8885,3449150],
                        ['A','2020-04-09T00:15:28.838000','Org1','wifi',14770,3572589],
                         ['A','2020-04-09T04:27:33.424000','Org1','remote',0,0],
                          ['A','2020-04-09T04:29:25.189000','Org1','wifi',60,7495],
                           ['A','2020-04-09T04:44:21.397000','Org1','remote',60,553356],
                            ['A','2020-04-09T04:50:40.406000','Org1','wifi',60,662467],
                             ['A','2020-04-10T00:00:50.636000','Org1','remote',0,72],
                              ['A','2020-04-10T04:20:28.831000','Org1','remote',6,497],
                               ['A','2020-04-10T04:31:35.336000','Org1','remote',0,22],
                                ['B','2020-04-08T21:56:58.502000','Org2','remote',0,0],
                                 ['B','2020-04-08T22:01:19.534000','Org2','wifi',0,0],
                                  ['B','2020-04-08T22:10:15.891000','Org2','wifi',60,187891],
                                   ['B','2020-04-08T22:16:41.499000','Org2','wifi',1620,207674],
                                    ['B','2020-04-09T01:55:02.772000','Org2','wifi',360,426232],
                                     ['B','2020-04-09T02:03:32.735000','Org2','wifi',60,374827],
                                      ['B','2020-04-09T02:06:16.509000','Org2','wifi',60,386518],
                                       ['B','2020-04-09T02:13:33.497000','Org2','remote',60,373609],
                                        ['B','2020-04-09T02:17:19.176000','Org2','wifi',133,400417],
                                         ['B','2020-04-10T23:10:15.654000','Org2','remote',0,212],
                                          ['B','2020-04-10T23:10:41.749000','Org2','remote',1,285]
                    ]).toDF(("deviceId","time-started","OrgId","type","duration","packet"))
dftest.show()

1 Ответ

1 голос
/ 22 апреля 2020

Для вашего случая ваше следующее 24 hour зависит от end of the last one и date after that last date, поэтому мы не можем express это logi c только с оконными функциями . Я decoupled вычисление суммы из pandas (как это было бы медленно) и использовало spark in-built functions to get your sum, а pandas udaf в основном дает нам desired date groups, и мы filter на них, чтобы получить желаемый результат.

Итерация - единственный способ получить ваши 24-часовые сегменты , так что вы также можете использовать простой udf, но векторизованный udaf pandas позволяет нам express наши логи c для небольших групп (на основе идентификатора), поэтому должно быть лучше. Pandas-udaf(spark2.3+)

import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("deviceId").orderBy(F.col("time-started").cast("long")).rangeBetween(Window.currentRow,24*60*60)
df2=df.withColumn("time-started", F.to_timestamp("time-started", "yyyy-MM-dd'T'HH:mm:ss"))\
      .withColumn("time-started-2", F.col("time-started"))\
      .withColumn("duration", F.sum("duration").over(w))\
      .withColumn("packet", F.sum("packet").over(w))

@pandas_udf(df2.schema, PandasUDFType.GROUPED_MAP)
def grouped_map(df1):
   start=df1.loc[0, 'time-started']
   for i in range(1, len(df1)):
        if start + pd.Timedelta(days=1)>df1.loc[i,'time-started']:
             df1.loc[i,'time-started']=start
        else:
             start=df1.loc[i,'time-started']    


   return df1
df2.groupby('deviceId').apply(grouped_map)\
.filter(F.col("time-started-2")==F.col("time-started"))\
.drop("time-started-2")\
.orderBy("deviceId")\
.show()

#+--------+-------------------+-----+------+--------+-------+
#|deviceId|       time-started|OrgId|  type|duration| packet|
#+--------+-------------------+-----+------+--------+-------+
#|       A|2020-04-08 01:53:54| Org1|  wifi|   23909|8213496|
#|       A|2020-04-09 04:27:33| Org1|remote|     186|1223887|
#|       A|2020-04-10 04:31:35| Org1|remote|       0|     22|
#|       B|2020-04-08 21:56:58| Org2|remote|    2353|2357168|
#|       B|2020-04-10 23:10:15| Org2|remote|       1|    497|
#+--------+-------------------+-----+------+--------+-------+

Вы также можете взглянуть на похожие вопрос . Предложенное решение было scala udf с использованием функции foldleft. Я думаю pandas сгруппированная карта udaf будет лучшей альтернативой.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...