Как пометить последние строки из окна с помощью Pyspark - PullRequest
0 голосов
/ 30 января 2020

Моя цель - создать новый столбец is_end (когда последний и предыдущий p_uuid isNull (), затем is_end = 1, иначе = 0. Я не знаю, как объединить функции When () и last ().

Я несколько раз пытался объединить с windows, но всегда с ошибками: (

df = spark.createDataFrame([
                        (1, 110, None, '2019-09-28'),
                        (2, 110, None, '2019-09-28'),
                        (3, 110, 'aaa', '2019-09-28'),
                        (4, 110, None, '2019-09-17'),
                        (5, 110, None, '2019-09-17'),
                        (6, 110, 'bbb', '2019-09-17'),
                        (7, 110, None, '2019-09-01'),
                        (8, 110, None, '2019-09-01'),
                        (9, 110, None, '2019-09-01'),
                        (10, 110, None, '2019-09-01'),
                        (11, 110, 'ccc', '2019-09-01'),
                        (12, 110, None, '2019-09-01'),
                        (13, 110, None, '2019-09-01'),
                        (14, 110, None, '2019-09-01')
                    ],
                    ['idx', 'u_uuid', 'p_uuid', 'timestamp']
                )
df.show()

Мой фрейм данных:

+---+------+------+----------+
|idx|u_uuid|p_uuid| timestamp|
+---+------+------+----------+
|  1|   110|  null|2019-09-28|
|  2|   110|  null|2019-09-28|
|  3|   110|   aaa|2019-09-28|
|  4|   110|  null|2019-09-17|
|  5|   110|  null|2019-09-17|
|  6|   110|   bbb|2019-09-17|
|  7|   110|  null|2019-09-01|
|  8|   110|  null|2019-09-01|
|  9|   110|  null|2019-09-01|
| 10|   110|  null|2019-09-01|
| 11|   110|   ccc|2019-09-01|
| 12|   110|  null|2019-09-01|
| 13|   110|  null|2019-09-01|
| 14|   110|  null|2019-09-01|
+---+------+------+----------+

w = Window.partitionBy("u_uuid").orderBy(col("timestamp"))
df.withColumn("p_uuid", when( lag(F.col("p_uuid").isNull()).over(w), 1).otherwise(0))

Что я ищу:

+---+------+------+----------+------+
|idx|u_uuid|p_uuid| timestamp|is_end|
+---+------+------+----------+------+
|  1|   110|  null|2019-09-28|     0|
|  2|   110|  null|2019-09-28|     0|
|  3|   110|   aaa|2019-09-28|     0|
|  4|   110|  null|2019-09-17|     0|
|  5|   110|  null|2019-09-17|     0|
|  6|   110|   bbb|2019-09-17|     0|
|  7|   110|  null|2019-09-01|     0|
|  8|   110|  null|2019-09-01|     0|
|  9|   110|  null|2019-09-01|     0|
| 10|   110|  null|2019-09-01|     0|
| 11|   110|   ccc|2019-09-01|     0|
| 12|   110|  null|2019-08-29|     1|
| 13|   110|  null|2019-08-29|     1|
| 14|   110|  null|2019-08-29|     1|

1 Ответ

1 голос
/ 31 января 2020

Сильфон - это pyspark sql ассоциируется с вашим делом:

w = (Window
    .partitionBy("u_uuid")
    .orderBy("timestamp"))
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))

df.withColumn("is_end", F.when(F.last("p_uuid", True).over(w).isNull() & F.col("p_uuid").isNull(), F.lit(1)).otherwise(F.lit(0)))\
    .show()
...