Агрегация по частичному фрейму данных в pyspark - PullRequest
1 голос
/ 30 апреля 2020

Можно ли выполнить агрегацию на частичном кадре данных? Или можно эффективно разделить данные по заданным условиям?

Скажите, что у меня есть кадр данных, подобный приведенному ниже:

+-------------+-----------------+-----------+----------------+-----------+
|     epoch_ms|ID               | state     | value 1        | value 2   |
+-------------+-----------------+-----------+----------------+-----------+
|1588119659000| 3489692692      |        3.0|   0.239999     |   11.2699 |   
|1587497991000| 3489692692      |        2.0|   0.159999     |   21.6999 | 
|1587864812000| 3489692692      |        2.0|   0.959999     |   359.649 |  
|1587581329000| 3489692692      |        1.0|   1.039999     |   336.209 |  
|1587581329000| 3489692692      |        3.0|   1.039999     |   336.299 |   
|1587581329000| 3489692692      |        1.0|   2.799999     |   336.209 |   

Существует ли эффективный способ разделения по «событию», предполагая, событие начинается с состояния = 3 и заканчивается состоянием = 1. Я бы хотел, чтобы между этими состояниями были меньшие кадры данных, в этом небольшом случае:

+-------------+-----------------+-----------+----------------+-----------+
|     epoch_ms|ID               | state     | value 1        | value 2   |
+-------------+-----------------+-----------+----------------+-----------+
|1588119659000| 3489692692      |        3.0|   0.239999     |   11.2699 |   
|1587497991000| 3489692692      |        2.0|   0.159999     |   21.6999 | 
|1587864812000| 3489692692      |        2.0|   0.959999     |   359.649 |  
|1587581329000| 3489692692      |        1.0|   1.039999     |   336.209 | 

и

+-------------+-----------------+-----------+----------------+-----------+
|     epoch_ms|ID               | state     | value 1        | value 2   |
+-------------+-----------------+-----------+----------------+-----------+
|1587581329000| 3489692692      |        3.0|   1.039999     |   336.299 |   
|1587581329000| 3489692692      |        1.0|   2.799999     |   336.209 |  

Моя конечная цель - создать еще один фрейм данных, который будет агрегировать значения на основе начальной и конечной эпох, что-то вроде:

+-------------+---------------+-------------+--------------+-------------+
|  ID         |start epoch    |end_epoch    | max(value 1) | max(value 2)|
+-------------+---------------+-------------+--------------+-------------+
|3489692692   |1588119659000  |1587581329000|1.039999      |359.649      |
|3489692692   |1587581329000  |1587581329000|2.799999      |336.299      |

Раньше, когда я не обрабатывал слишком много данных, я использовал pandas для перебирайте фрейм данных и строите новый фрейм данных построчно, но да, это не очень эффективно. Будем благодарны за любые подсказки, указывающие мне правильное направление.

------- ### ОБНОВЛЕНИЕ ### ----------

I предположим, что ниже приведен лучший пример данных, с которыми я работаю:

+-------------+-----------------+-----------+----------------+-----------+
|     epoch_ms|ID               | state     | value 1        | value 2   |
+-------------+-----------------+-----------+----------------+-----------+
|1585766054000| 3489692692      |        3.0|   0.159999     |   7.58996 |
|1585766055000| 3489692692      |        3.0|   0.239999     |   11.2699 |  
|1585766058000| 3489692692      |        3.0|   0.135489     |   13.8790 |
|1587497991000| 3489692692      |        2.0|   0.159999     |   21.6999 | 
|1587864812000| 3489692692      |        2.0|   0.959999     |   359.649 |  
|1587581329000| 3489692692      |        1.0|   1.039999     |   336.209 |  
|1587581339000| 3489692692      |        3.0|   1.039999     |   336.299 | 
|1587581329000| 3489692692      |        1.0|   2.799999     |   336.209 |
|1588088096000| 3489692670      |        3.0|   2.869564     |   285.963 |
|1588088099000| 3489692670      |        2.0|   0.758753     |   299.578 |
|1588088199000| 3489692670      |        1.0|   3.965424     |   5.89677 |

Что следует учитывать:

  • Событие начинается с состояния 3 и заканчивается состоянием 1
  • Состояния могут повторяться, например, состояние 3 или 2 может появляться несколько раз после запуска, но событие должно включать их все, пока не появится состояние 1.
  • Другие состояния после состояния 1 могут возникнуть , укажите одно или несколько состояний, но следующее событие не начнется, пока состояние снова не станет равным трем, все, что находится между состояниями 1 и 3 (конец предыдущего события и начало нового события), следует игнорировать.
  • Если фрейм данных заканчивается состоянием, отличным от 3, следует предположить, что в конце происходит три.
  • Возможно несколько идентификаторов, а фрейм данных упорядочен как по эпохе, так и по id.

Результаты для приведенного выше примера должны быть примерно такими:

+-------------+---------------+-------------+--------------+-------------+
|  ID         |start epoch    |end_epoch    | max(value 1) | max(value 2)|
+-------------+---------------+-------------+--------------+-------------+
|3489692692   |1585766054000  |1587581329000|1.039999      |359.649      |
|3489692692   |1587581339000  |1587581329000|2.799999      |336.299      |
|3489692670   |1588088096000  |1588088199000|3.965424      |299.578      |

1 Ответ

1 голос
/ 01 мая 2020

Разделение будет нелогичным, вы должны express свои логики c, используя pyspark in-built функции агрегирования (window + groupBy). Пока данные упорядочены так, как вы их представили, код будет работать нормально (, потому что невозможно определить порядок, поскольку для некоторых строк у вас разные epoch_ms для одного и того же состояния (строка 2,3 ) И логика c состоит в том, чтобы использовать incremental sum, используя условие состояния для вывода sh вашей группировки для start/end. Попробуйте это и lmk.

df.show() #sampledata
#+-------------+----------+-----+--------+-------+
#|     epoch_ms|        ID|state| value 1|value 2|
#+-------------+----------+-----+--------+-------+
#|1588119659000|3489692692|  3.0|0.239999|11.2699|
#|1587497991000|3489692692|  2.0|0.159999|21.6999|
#|1587864812000|3489692692|  2.0|0.959999|359.649|
#|1587581329000|3489692692|  1.0|1.039999|336.209|
#|1587581329000|3489692692|  3.0|1.039999|336.299|
#|1587581329000|3489692692|  1.0|2.799999|336.209|
#+-------------+----------+-----+--------+-------+

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("ID").orderBy(F.lit(1))
w2=Window().partitionBy("ID").orderBy("rowNum")

df.withColumn("rowNum", F.row_number().over(w))\
  .withColumn("inc_sum", F.sum(F.when(F.col("state")==3,F.lit(1)).otherwise(F.lit(0))).over(w2))\
  .groupBy("inc_sum").agg(F.first("ID").alias("ID"),\
                          F.max("epoch_ms").alias("start_epoch"),\
                          F.min("epoch_ms").alias("end_epoch"),F.max("value 1").alias("max_value1"),\
                          F.max("value 2").alias("max_value2")).drop("inc_sum").show()

#+-------+----------+-------------+-------------+----------+----------+
#|inc_sum|        ID|  start_epoch|    end_epoch|max_value1|max_value2|
#+-------+----------+-------------+-------------+----------+----------+
#|      1|3489692692|1588119659000|1587497991000|  1.039999|   359.649|
#|      2|3489692692|1587581329000|1587581329000|  2.799999|   336.299|
#+-------+----------+-------------+-------------+----------+----------+

UPDATE:

Попробуйте это. Я использую lag condition!=3 with state=3 условие для single out the start of the event, а затем incremental sum, чтобы получить наши группы.

df.show() #sampledata
#+-------------+----------+-----+--------+-------+
#|     epoch_ms|        ID|state| value 1|value 2|
#+-------------+----------+-----+--------+-------+
#|1585766054000|3489692692|  3.0|0.159999|7.58996|
#|1585766055000|3489692692|  3.0|0.239999|11.2699|
#|1585766058000|3489692692|  3.0|0.135489| 13.879|
#|1587497991000|3489692692|  2.0|0.159999|21.6999|
#|1587864812000|3489692692|  2.0|0.959999|359.649|
#|1587581329000|3489692692|  1.0|1.039999|336.209|
#|1587581339000|3489692692|  3.0|1.039999|336.299|
#|1587581329000|3489692692|  1.0|2.799999|336.209|
#|1588088096000|3489692670|  3.0|2.869564|285.963|
#|1588088099000|3489692670|  2.0|0.758753|299.578|
#|1588088199000|3489692670|  1.0|3.965424|5.89677|
#+-------------+----------+-----+--------+-------+

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().orderBy("rowNum")

df.withColumn("rowNum", F.monotonically_increasing_id())\
  .withColumn("inc_sum", F.sum(F.when((F.col("state")==3) & (F.lag("state").over(w)!=3)\
                                      ,F.lit(1)).otherwise(F.lit(0)))\
                                       .over(w))\
    .groupBy("inc_sum").agg(F.first("ID").alias("ID"),\
                          F.first("epoch_ms").alias("start_epoch"),\
                          F.last("epoch_ms").alias("end_epoch"),F.max("value 1").alias("max_value1"),\
                          F.max("value 2").alias("max_value2")).drop("inc_sum").show()

#+----------+-------------+-------------+----------+----------+
#|        ID|  start_epoch|    end_epoch|max_value1|max_value2|
#+----------+-------------+-------------+----------+----------+
#|3489692692|1585766054000|1587581329000|  1.039999|   359.649|
#|3489692692|1587581339000|1587581329000|  2.799999|   336.299|
#|3489692670|1588088096000|1588088199000|  3.965424|   299.578|
#+----------+-------------+-------------+----------+----------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...