У меня есть набор данных показаний от разных датчиков.Цель состоит в том, чтобы получить среднее изменение и стандартное отклонение каждые 10 секунд по всем датчикам (т. Е. Конечный результат представляет собой просто метку времени, среднее изменение и стандартное отклонение изменения)
Поскольку данные настолько велики, яПервоначально использовать Spark для обработки 200 миллионов строк:
Исходные данные выглядят следующим образом:
>>> df.show(10, False)
+--------------+-----------------------+-----------+-------+
+integerdate | event_timestamp| sensor_id|reading|
+--------------+-----------------------+-----------+-------+
|20180703 |2018-07-03 10:32:50.473| Front| 54.82|
|20180703 |2018-07-03 15:59:50.616| Front| 54.54|
|20180703 |2018-07-03 14:49:55.718| Front| 54.64|
|20180703 |2018-07-03 09:30:00.003| Bore| 55.60|
|20180703 |2018-07-03 15:08:16.099| Bore| 54.66|
|20180703 |2018-07-03 09:30:54.837| Atten| 57.08|
|20180703 |2018-07-03 09:40:24.333| Atten| 57.08|
|20180703 |2018-07-03 10:06:01.027| Atten| 56.69|
|20180703 |2018-07-03 10:06:28.787| Atten| 56.70|
|20180703 |2018-07-03 10:14:32.675| Atten| 56.64|
+--------------+-----------------------+-----------+-------+
Однако, поскольку меня интересует только изменение показаний, я использую окно Spark ианалитическая функция lag
для получения первого показания для каждого датчика в каждый день и только в случае изменения показаний датчика:
>>> df1 = df\
.withColumn('previous_reading',
F.lag(df.reading, 1)
.over(Window.partitionBy('integerdate', 'sensor_id')
.orderBy(df.event_timestamp)))\
.filter((F.col('previous_reading').isNull()) | (F.col('reading') != F.col('previous_reading')))\
.withColumn('reading_change', F.bround(df.reading - F.col('previous_reading'), 2))\
.withColumn('previous_timestamp',
F.lag(df.event_timestamp, 1)
.over(Window
.partitionBy('integerdate', 'sensor_id')
.orderBy(df.event_timestamp)))\
.withColumn('seconds_elapsed',
time_elapsed_udf(F.struct(df.event_timestamp,
F.col('previous_timestamp'))))
>>> df1.show(10, False)
+-----------+-----------------------+-----------+-------+----------------+--------------+-----------------------+---------------+
|integerdate|event_timestamp |sensor_id |reading|previous_reading|reading_change|previous_timestamp |seconds_elapsed|
+-----------+-----------------------+-----------+-------+----------------+--------------+-----------------------+---------------+
|20180703 |2018-07-03 09:32:00.972|Back | 0.365|null |null |null |null |
|20180703 |2018-07-03 09:36:04.096|Anter | 0.210|null |null |null |null |
|20180703 |2018-07-03 11:59:17.118|Anter | 0.250|0.21 |0.04 |2018-07-03 09:36:04.096|8593 |
|20180703 |2018-07-03 12:47:40.309|Alloc | 47.99|null |null |null |null |
|20180703 |2018-07-03 08:00:13.931|Bore | 2.730|null |null |null |null |
|20180703 |2018-07-03 09:30:00.003|Bore | 2.750|2.73 |0.02 |2018-07-03 08:00:13.931|5386 |
|20180703 |2018-07-03 09:30:00.003|Bore | 2.710|2.75 |-0.04 |2018-07-03 09:30:00.003|0 |
|20180703 |2018-07-03 09:30:00.697|Bore | 2.780|2.71 |0.07 |2018-07-03 09:30:00.003|0 |
|20180703 |2018-07-03 09:32:47.269|Bore | 2.730|2.78 |-0.05 |2018-07-03 09:30:00.697|166 |
|20180703 |2018-07-03 09:34:50.814|Bore | 2.760|2.73 |0.03 |2018-07-03 09:32:47.269|123 |
+-----------+-----------------------+-----------+-------+----------------+--------------+-----------------------+---------------+
Это уменьшает общее количество строк примерно до 20 миллионов, что я легко могупроцесс в пандах.
>>> pd_df = df1.toPandas()
>>> pd_df.head(10)
integerdate event_timestamp sensor_id reading previous_reading reading_change previous_timestamp seconds_elapsed
0 20180703 2018-07-03 09:32:00.972 Back 0.365 NaN NaN NaT NaN
1 20180703 2018-07-03 09:36:04.096 Anter 0.210 NaN NaN NaT NaN
2 20180703 2018-07-03 11:59:17.118 Anter 0.250 0.21 0.04 2018-07-03 09:36:04.096 8593
3 20180703 2018-07-03 12:47:40.309 Alloc 47.99 NaN NaN NaT NaN
4 20180703 2018-07-03 08:00:13.931 Bore 2.730 NaN NaN NaT NaN
5 20180703 2018-07-03 09:30:00.003 Bore 2.750 2.73 0.02 2018-07-03 08:00:13.931 5386
6 20180703 2018-07-03 09:30:00.003 Bore 2.710 2.75 -0.04 2018-07-03 09:30:00.003 0
7 20180703 2018-07-03 09:30:00.697 Bore 2.780 2.71 0.07 2018-07-03 09:30:00.003 0
8 20180703 2018-07-03 09:32:47.269 Bore 2.730 2.78 -0.05 2018-07-03 09:30:00.697 166
9 20180703 2018-07-03 09:34:50.814 Bore 2.760 2.73 0.03 2018-07-03 09:32:47.269 123
Я думаю, что мне нужно только event_timestamp
, sensor_id
, reading
и использовать панд resample()
и apply()
, чтобы получать изменения в чтении каждые 10 секунд.
Я решил увидеть previous_reading
, previous_timestamp
и time_elapsed
в своем преобразовании Spark, чтобы убедиться, что окно и lag()
работают правильно.
Теперь проблема в том, чтодля некоторых датчиков показания изменяются каждые несколько микросекунд или секунд, но для некоторых датчиков показания не меняются в течение многих часов.
Как я могу использовать панды resample()
, чтобы получать изменения в показаниях каждые 10 секунд изатем найти среднее изменение и стандартное отклонение изменения показаний датчика каждые 10 секунд?
Окончательный результат должен быть в следующем формате:
event_timestamp avg_change std_dev
2018-07-03 09:00:10 0.05 0.02
2018-07-03 09:00:20 0.21 0.01
2018-07-03 09:00:30 0.58 0.12
2018-07-03 09:00:40 0.71 0.45
2018-07-03 09:00:50 1.14 0.78
2018-07-03 09:01:00 1.05 0.79
2018-07-03 09:01:10 5.05 0.24
2018-07-03 09:01:20 1.96 0.30
2018-07-03 09:01:30 0.51 0.01
2018-07-03 09:01:40 0.14 0.02
Дайте мне знать, если вам нужна дополнительная информация отменя