Перечисление последовательностей в отсортированном фрейме данных PySpark - PullRequest
0 голосов
/ 25 февраля 2020

У меня есть Spark DF, к которому я пытаюсь применить дополнительную группировку. Это отсортированный кадр данных в виде

--------------------------------------
| id |   timestamp    | head_indices |
--------------------------------------
| 1  |     23         | 1            |
| 1  |     24         | 0            |
| 1  |     25         | 0            |
| 1  |     55         | 1            |
| 1  |     56         | 0            |
| 1  |     57         | 0            |
| 1  |     58         | 0            |
| 1  |     75         | 1            |
| 1  |     77         | 0            |
| 2  |     43         | 1            |
| 2  |     44         | 0            |
| 2  |     45         | 0            |
--------------------------------------

. В этих данных каждый id имеет некоторое количество смежных строк, где начало каждой последовательности обозначается head_indices. Я пытаюсь использовать head_indices для перечисления смежных последовательностей в новый столбец, чтобы преобразование впоследствии выглядело так:

-----------------------------------------------
| id |   timestamp    | head_indices | seq_id |
-----------------------------------------------
| 1  |     23         | 1            | 1      |
| 1  |     24         | 0            | 1      |
| 1  |     25         | 0            | 1      |
| 1  |     55         | 1            | 2      |
| 1  |     56         | 0            | 2      |
| 1  |     57         | 0            | 2      |
| 1  |     58         | 0            | 2      |
| 1  |     75         | 1            | 3      |
| 1  |     77         | 0            | 3      |
| 2  |     43         | 1            | 1      |
| 2  |     44         | 0            | 1      |
| 2  |     45         | 0            | 1      |
-----------------------------------------------

Где seq_id представляет индекс последовательности, разделенный id.

Любое руководство будет приветствоваться

1 Ответ

2 голосов
/ 26 февраля 2020

Попробуй это. Я использовал 10 в качестве диапазона между безопасными мерами, если число последовательных временных меток больше 10 или разница между временными метками меньше 10, тогда нужно будет изменить 10. Это более динамично c, чем мой последний ответ.

w=Window().partitionBy("id", "head_indices").orderBy("timestamp")
w2=Window().partitionBy("id").orderBy(F.col("timestamp")).rangeBetween(-10,0)
df.withColumn("head1", F.row_number().over(w))\
.withColumn("head2", F.when(F.col("head_indices")==1, F.col("head1")).otherwise(F.col("head_indices")))\
.withColumn("seq_id", F.first("head2").over(w2))\
.drop("head1","head2")\
.orderBy("id","timestamp").show()

+---+---------+------------+------+
| id|timestamp|head_indices|seq_id|
+---+---------+------------+------+
|  1|       23|           1|     1|
|  1|       24|           0|     1|
|  1|       25|           0|     1|
|  1|       55|           1|     2|
|  1|       56|           0|     2|
|  1|       57|           0|     2|
|  1|       75|           1|     3|
|  1|       76|           0|     3|
|  1|       77|           0|     3|
|  2|       43|           1|     1|
|  2|       44|           0|     1|
|  2|       45|           0|     1|
+---+---------+------------+------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...