PySpark: вычислить значения в столбце X на основе предыдущих значений в столбце Y и ID - PullRequest
0 голосов
/ 02 мая 2019

У меня есть три столбца в фрейме данных PySpark: ID, Y и пустой столбец X.

data = spark.read.options(sep="|", header="true", inferschema="true").csv(csv_file)
data.createOrReplaceTempView("TABLE_NAME")
df = spark.sql("SELECT ID, Y, X FROM TABLE_NAME")
df

ID: строка, Y: int, X: int

+----+-----+---+
| ID |  Y  | X |
+----+-----+---+
| V1 |   0 |   |
| V2 |   0 |   |
| V1 | 100 |   |
| V2 | 100 |   |
| V1 | 250 |   |
| V2 | 400 |   |
+----+-----+---+

Я хотел бы рассчитать X как ноль, если текущая строка является первым вхождением идентификатора, или как разницу между Y в текущем вхождении идентификатора и Y в последнем вхождении идентификатора:

+----+-----+-----+
| ID |  Y  |  X  |
+----+-----+-----+
| V1 |   0 |   0 |
| V2 |   0 |   0 |
| V1 | 100 | 100 |
| V2 | 100 | 100 |
| V1 | 250 | 150 |
| V2 | 400 | 300 |
+----+-----+-----+

Не могли бы вы помочь мне достичь этого?

Мне очень жаль, я не мог найти, как читать данные, вводя их вручную, вот мой csv_file для воспроизводимости:

ID|Y|Date
V1|0|2018-06-22 08:33:05
V2|0|2018-06-22 08:33:05
V1|100|2018-06-22 08:34:05
V2|100|2018-06-22 08:34:05
V1|250|2018-06-22 08:35:05
V2|400|2018-06-22 08:35:05
V2|-50|2018-06-22 08:36:05
V2|400|2018-06-22 08:37:05

Версия Spark: 2.4.0

РЕДАКТИРОВАТЬ: Использование решения Стивена с дополнительными строками:

+---+---+----+
| id|  y|   x|
+---+---+----+
| v2|-50|   0|
| v2|  0|  50|
| v2|100| 150|
| v2|400| 350|
| v2|400| -50|
| v2|400|-450|
| v1|  0|   0|
| v1|100| 100|
| v1|250| 150|
+---+---+----+

Желаемый результат:

+----+-----+-----+---------------------+
| id |  y  |  x  |        Date         |
+----+-----+-----+---------------------+
| v2 |   0 |   0 | 2018-06-22 08:33:05 |
| v2 | 100 | 100 | 2018-06-22 08:34:05 |
| v2 | 400 | 300 | 2018-06-22 08:35:05 |
| v2 | -50 | -450 | 2018-06-22 08:36:05 |
| v2 | 400 |  450 | 2018-06-22 08:37:05 |
| v1 |   0 |   0 | 2018-06-22 08:33:05 |
| v1 | 100 | 100 | 2018-06-22 08:34:05 |
| v1 | 250 | 150 | 2018-06-22 08:35:05 |
+----+-----+-----+---------------------+

С orderBy («Дата»):

+---+---+-------------------+----+
| id|  y|               Date|   x|
+---+---+-------------------+----+
| v2|  0|2018-06-22 08:33:05|   0|
| v2|100|2018-06-22 08:34:05| 100|
| v2|400|2018-06-22 08:35:05| 300|
| v2|-50|2018-06-22 08:36:05|-550|
| v2|400|2018-06-22 08:37:05| -50|
| v1|  0|2018-06-22 08:33:05|   0|
| v1|100|2018-06-22 08:34:05| 100|
| v1|250|2018-06-22 08:35:05| 150|
+---+---+-------------------+----+

1 Ответ

1 голос
/ 02 мая 2019

вот ваш датафрейм:

df.show()                                                                                                    

+---+---+-------------------+                                                   
| id|  y|               date|
+---+---+-------------------+
| V1|  0|2018-06-22 08:33:05|
| V2|  0|2018-06-22 08:33:05|
| V1|100|2018-06-22 08:34:05|
| V2|100|2018-06-22 08:34:05|
| V1|250|2018-06-22 08:35:05|
| V2|400|2018-06-22 08:35:05|
| V2|-50|2018-06-22 08:36:05|
| V2|400|2018-06-22 08:37:05|
+---+---+-------------------+

Вы можете получить свой результат, используя lag:

from pyspark.sql import Window, functions as F

df.withColumn(
    "x", 
    F.coalesce(
        F.col("y") 
        - F.lag("y").over(
            Window.partitionBy(
                "id"
            ).orderBy(
                "date"
            )
        ), 
        F.lit(0)
    )
).show()  

+---+---+-------------------+------+                                            
| id|  y|               date|     x|
+---+---+-------------------+------+
| V2|  0|2018-06-22 08:33:05|   0.0|
| V2|100|2018-06-22 08:34:05| 100.0|
| V2|400|2018-06-22 08:35:05| 300.0|
| V2|-50|2018-06-22 08:36:05|-450.0|
| V2|400|2018-06-22 08:37:05| 450.0|
| V1|  0|2018-06-22 08:33:05|   0.0|
| V1|100|2018-06-22 08:34:05| 100.0|
| V1|250|2018-06-22 08:35:05| 150.0|
+---+---+-------------------+------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...