Pyspark: рассчитать серию последовательных наблюдений - PullRequest
0 голосов
/ 30 января 2019

У меня есть фрейм данных Spark (2.4.0) со столбцом, который имеет только два значения (0 или 1).Мне нужно вычислить полосу последовательных 0 с и 1 с в этих данных, сбросив полосу на ноль, если значение изменится.

Пример:

from pyspark.sql import (SparkSession, Window)
from pyspark.sql.functions import (to_date, row_number, lead, col)

spark = SparkSession.builder.appName('test').getOrCreate()

# Create dataframe
df = spark.createDataFrame([
    ('2018-01-01', 'John', 0, 0),
    ('2018-01-01', 'Paul', 1, 0),
    ('2018-01-08', 'Paul', 3, 1),
    ('2018-01-08', 'Pete', 4, 0),
    ('2018-01-08', 'John', 3, 0),
    ('2018-01-15', 'Mary', 6, 0),
    ('2018-01-15', 'Pete', 6, 0),
    ('2018-01-15', 'John', 6, 1),
    ('2018-01-15', 'Paul', 6, 1),
], ['str_date', 'name', 'value', 'flag'])

df.orderBy('name', 'str_date').show()
## +----------+----+-----+----+
## |  str_date|name|value|flag|
## +----------+----+-----+----+
## |2018-01-01|John|    0|   0|
## |2018-01-08|John|    3|   0|
## |2018-01-15|John|    6|   1|
## |2018-01-15|Mary|    6|   0|
## |2018-01-01|Paul|    1|   0|
## |2018-01-08|Paul|    3|   1|
## |2018-01-15|Paul|    6|   1|
## |2018-01-08|Pete|    4|   0|
## |2018-01-15|Pete|    6|   0|
## +----------+----+-----+----+

СПо этим данным я хотел бы рассчитать серию последовательных нулей и единиц, упорядоченных по дате и «оконных» по имени:

# Expected result:
## +----------+----+-----+----+--------+--------+
## |  str_date|name|value|flag|streak_0|streak_1|
## +----------+----+-----+----+--------+--------+
## |2018-01-01|John|    0|   0|       1|       0|
## |2018-01-08|John|    3|   0|       2|       0|
## |2018-01-15|John|    6|   1|       0|       1|
## |2018-01-15|Mary|    6|   0|       1|       0|
## |2018-01-01|Paul|    1|   0|       1|       0|
## |2018-01-08|Paul|    3|   1|       0|       1|
## |2018-01-15|Paul|    6|   1|       0|       2|
## |2018-01-08|Pete|    4|   0|       1|       0|
## |2018-01-15|Pete|    6|   0|       2|       0|
## +----------+----+-----+----+--------+--------+

Конечно, мне понадобится полоса, чтобы обнулить себя, если«флаг» меняется.

Есть ли способ сделать это?

1 Ответ

0 голосов
/ 30 января 2019

Это потребовало бы разницы в подходе к номерам строк для последовательных строк первой группы с одинаковым значением, а затем с использованием подхода ранжирования между группами.

from pyspark.sql import Window 
from pyspark.sql import functions as f
#Windows definition
w1 = Window.partitionBy(df.name).orderBy(df.date)
w2 = Window.partitionBy(df.name,df.flag).orderBy(df.date)

res = df.withColumn('grp',f.row_number().over(w1)-f.row_number().over(w2))
#Window definition for streak
w3 = Window.partitionBy(res.name,res.flag,res.grp).orderBy(res.date)
streak_res = res.withColumn('streak_0',f.when(res.flag == 1,0).otherwise(f.row_number().over(w3))) \
                .withColumn('streak_1',f.when(res.flag == 0,0).otherwise(f.row_number().over(w3)))
streak_res.show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...