Найти следующее отличное значение от задержки в pyspark - PullRequest
1 голос
/ 21 апреля 2020

У меня есть такой фрейм данных pyspark,

+-----+----------+
|value|val_joined|
+-----+----------+
|    3|         3|
|    4|       3+4|
|    5|     3+4+5|
|    5|     3+4+5|
|    5|     3+4+5|
|    2|   3+4+5+2|
+-----+----------+

Из этого мне нужно создать еще один столбец, который выглядит следующим образом:

+-----+----------+------+
|value|val_joined|result|
+-----+----------+------+
|    3|         3|   4.0|
|    4|       3+4|   5.0|
|    5|     3+4+5|   2.0|
|    5|     3+4+5|   2.0|
|    5|     3+4+5|   2.0|
|    2|   3+4+5+2|   NaN|
+-----+----------+------+

Необходимо создать столбец результатов например, для элемента в столбце с именем value найдите следующий элемент в порядке. Таким образом, для значения 3 это будет 4, а для значения 4 - 5.

Но при наличии дубликатов, таких как значение 5, которое повторяется 3 раза, простая задержка не будет работать. Поскольку запаздывание для первых 5 приведет к 5. Я в основном хочу повторить взятие запаздывания, пока значение! = Запаздывание (значение) или запаздывание (значение) не станет равным нулю.

Как я могу сделать это в pyspark без udf и объединений?

1 Ответ

1 голос
/ 21 апреля 2020

Мы можем взять 2 windows и найти значение следующей строки один раз в 1-м окне, назначив monotonically_increasing_id и значение last в другом окне, как показано ниже:

import pyspark.sql.functions as F
w = Window.orderBy('idx')
w1 = Window.partitionBy('value')

(df.withColumn('idx',F.monotonically_increasing_id())
.withColumn("result",F.last(F.lead("value").over(w)).over(w1)).orderBy('idx')
.drop('idx')).show()

+-----+----------+------+
|value|val_joined|result|
+-----+----------+------+
|    3|         3|     4|
|    4|       3+4|     5|
|    5|     3+4+5|     2|
|    5|     3+4+5|     2|
|    5|     3+4+5|     2|
|    2|   3+4+5+2|  null|
+-----+----------+------+

Если числа в значении могут повторяться позже, в следующем примере:

+-----+----------+
|value|val_joined|
+-----+----------+
|3    |3         |
|4    |3+4       |
|5    |3+4+5     |
|5    |3+4+5     |
|5    |3+4+5     |
|2    |3+4+5+2   |
|5    |3+4+5+2+5 | <- this value is repeated later
+-----+----------+

Тогда нам нужно будет создать отдельную группу и взять группу в качестве окна:

w = Window.orderBy('idx')
w1 = Window.partitionBy('group')

(df.withColumn('idx',F.monotonically_increasing_id())
  .withColumn("lag", F.when(F.lag("value").over(w)!=F.col("value"), F.lit(1))
  .otherwise(F.lit(0)))
  .withColumn("group", F.sum("lag").over(w) + 1).drop("lag")
  .withColumn("result",F.last(F.lead("value").over(w)).over(w1)).orderBy('idx')
  .drop('idx',"group")).show()

+-----+----------+------+
|value|val_joined|result|
+-----+----------+------+
|    3|         3|     4|
|    4|       3+4|     5|
|    5|     3+4+5|     2|
|    5|     3+4+5|     2|
|    5|     3+4+5|     2|
|    2|   3+4+5+2|     5|
|    5| 3+4+5+2+5|  null|
+-----+----------+------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...