Найти непрерывные данные в фрейме pyspark - PullRequest
1 голос
/ 16 мая 2019

У меня есть кадр данных, который выглядит как

key | value | time | status
x   | 10    | 0    | running
x   | 15    | 1    | running
x   | 30    | 2    | running
x   | 15    | 3    | running
x   | 0     | 4    | stop
x   | 40    | 5    | running
x   | 10    | 6    | running
y   | 10    | 0    | running
y   | 15    | 1    | running
y   | 30    | 2    | running
y   | 15    | 3    | running
y   | 0     | 4    | stop
y   | 40    | 5    | running
y   | 10    | 6    | running
...

Я хочу получить таблицу, которая выглядит как

key | start | end | status | max value
x   | 0     | 3   | running| 30
x   | 4     | 4   | stop   | 0
x   | 5     | 6   | running| 40
y   | 0     | 3   | running| 30
y   | 4     | 4   | stop   | 0
y   | 5     | 6   | running| 40
...

Другими словами, я хочу разделить на key, сортировка по time, в окна, которые имеют одинаковые status, сохраняют первый и последний time и выполняют вычисления для этого окна, то есть max из value

Идеально используя pyspark.

1 Ответ

1 голос
/ 16 мая 2019

Вот один из подходов, который вы можете использовать.

Сначала создайте столбец, чтобы определить, изменился ли status для данного key:

from pyspark.sql.functions import col, lag
from pyspark.sql import Window

w = Window.partitionBy("key").orderBy("time")    

df = df.withColumn(
    "status_change",
    (col("status") != lag("status").over(w)).cast("int")
)
df.show()
#+---+-----+----+-------+-------------+
#|key|value|time| status|status_change|
#+---+-----+----+-------+-------------+
#|  x|   10|   0|running|         null|
#|  x|   15|   1|running|            0|
#|  x|   30|   2|running|            0|
#|  x|   15|   3|running|            0|
#|  x|    0|   4|   stop|            1|
#|  x|   40|   5|running|            1|
#|  x|   10|   6|running|            0|
#|  y|   10|   0|running|         null|
#|  y|   15|   1|running|            0|
#|  y|   30|   2|running|            0|
#|  y|   15|   3|running|            0|
#|  y|    0|   4|   stop|            1|
#|  y|   40|   5|running|            1|
#|  y|   10|   6|running|            0|
#+---+-----+----+-------+-------------+

Затем заполните null с 0 и возьмите совокупную сумму столбца status_change, за key:

from pyspark.sql.functions import sum as sum_  # avoid shadowing builtin

df = df.fillna(0).withColumn(
    "status_group",
    sum_("status_change").over(w)
)
df.show()
#+---+-----+----+-------+-------------+------------+
#|key|value|time| status|status_change|status_group|
#+---+-----+----+-------+-------------+------------+
#|  x|   10|   0|running|            0|           0|
#|  x|   15|   1|running|            0|           0|
#|  x|   30|   2|running|            0|           0|
#|  x|   15|   3|running|            0|           0|
#|  x|    0|   4|   stop|            1|           1|
#|  x|   40|   5|running|            1|           2|
#|  x|   10|   6|running|            0|           2|
#|  y|   10|   0|running|            0|           0|
#|  y|   15|   1|running|            0|           0|
#|  y|   30|   2|running|            0|           0|
#|  y|   15|   3|running|            0|           0|
#|  y|    0|   4|   stop|            1|           1|
#|  y|   40|   5|running|            1|           2|
#|  y|   10|   6|running|            0|           2|
#+---+-----+----+-------+-------------+------------+

Теперь вы можете агрегировать по key и status_group. Вы также можете включить status в groupBy, поскольку он будет одинаковым для каждого status_group. В конце выберите только те столбцы, которые вы хотите использовать в выводе.

from pyspark.sql.functions import min as min_, max as max_

df_agg = df.groupBy("key", "status", "status_group")\
    .agg(
        min_("time").alias("start"), 
        max_("time").alias("end"), 
        max_("value").alias("max_value")
    )\
    .select("key", "start", "end", "status", "max_value")\
    .sort("key", "start")

df_agg.show()
#+---+-----+---+-------+---------+
#|key|start|end| status|max_value|
#+---+-----+---+-------+---------+
#|  x|    0|  3|running|       30|
#|  x|    4|  4|   stop|        0|
#|  x|    5|  6|running|       40|
#|  y|    0|  3|running|       30|
#|  y|    4|  4|   stop|        0|
#|  y|    5|  6|running|       40|
#+---+-----+---+-------+---------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...