Как подсчитать количество смежных значений в кадре данных Pyspark, равное определенному значению, используя функцию скользящего окна? - PullRequest
0 голосов
/ 17 января 2020

Примерный фрейм данных можно создать с помощью:

    from pyspark.sql.functions import col
    from pyspark.sql.window import Window

    df = sc.parallelize([['2019-08-29 01:00:00',0],
                          ['2019-08-29 02:00:00',0],
                          ['2019-08-29 03:00:00',0],
                          ['2019-08-29 04:00:00',1],
                          ['2019-08-29 05:00:00',2],
                          ['2019-08-29 06:00:00',3],
                          ['2019-08-29 07:00:00',0],
                          ['2019-08-29 08:00:00',2],
                          ['2019-08-29 09:00:00',0],
                          ['2019-08-29 10:00:00',1]]).toDF(['DATETIME','VAL']).withColumn('DATETIME',col('DATETIME').cast('timestamp'))

Я хочу создать столбец с числом, равным количеству появлений 0 значений в течение 3-часового периода (+/- 1 час от текущее время, включая текущий Val). Окно может быть создано с помощью:

w1 = (Window()
 .orderBy(col('DATETIME').cast('long'))
 .rangeBetween(-(60*60), 60*60))

Желаемый результат:

+-------------------+---+---+
|           DATETIME|VAL|NUM|
+-------------------+---+---+
|2019-08-29 01:00:00|  0|  2|
|2019-08-29 02:00:00|  0|  3|
|2019-08-29 03:00:00|  0|  2|
|2019-08-29 04:00:00|  1|  1|
|2019-08-29 05:00:00|  2|  0|
|2019-08-29 06:00:00|  3|  1|
|2019-08-29 07:00:00|  0|  1|
|2019-08-29 08:00:00|  2|  2|
|2019-08-29 09:00:00|  0|  1|
|2019-08-29 10:00:00|  1|  1|
+-------------------+---+---+

1 Ответ

1 голос
/ 17 января 2020

Если у вас есть только 1 запись на DATETIME , вы можете использовать функции lead и lag , чтобы получить предыдущее и следующее значения, а затем вы можете считать для нули.

from pyspark.sql.functions import udf, array, col
from pyspark.sql.types import IntegerType

count_zeros_udf = udf(lambda arr: arr.count(0), IntegerType())

df.withColumn('lag1', f.lag(col('VAL'), 1, -1).over(Window.orderBy("DATETIME")))   # Get the previous value
.withColumn('lag2', f.lead(col('VAL'), 1, -1).over(Window.orderBy("DATETIME")))    # Get the next value
.withColumn('NUM', count_zeros_udf(array('VAL', 'lag1', 'lag2')))                  # Count zeros using the udf
.drop('lag1', 'lag2')                                                              # Drop the extra columns
.show()

+-------------------+---+---+
|           DATETIME|VAL|NUM|
+-------------------+---+---+
|2019-08-29 01:00:00|  0|  2|
|2019-08-29 02:00:00|  0|  3|
|2019-08-29 03:00:00|  0|  2|
|2019-08-29 04:00:00|  1|  1|
|2019-08-29 05:00:00|  2|  0|
|2019-08-29 06:00:00|  3|  1|
|2019-08-29 07:00:00|  0|  1|
|2019-08-29 08:00:00|  2|  2|
|2019-08-29 09:00:00|  0|  1|
|2019-08-29 10:00:00|  1|  1|
+-------------------+---+---+

С pyspark> = 2.4 , вы можете использовать UDF с pandas UDF в окне, как описано здесь Определяемая пользователем функция для применения к окну в PySpark? . К сожалению, у меня нет pyspark 2.4 или выше, поэтому я не могу проверить его.

...