Pyspark не имеет функции сдвига, но вы можете работать с оконной функцией lag , которая дает вам строку перед текущей строкой. Первое окно (называемое w) устанавливает значение столбца val
равным 1, если значение столбца pos_neg
равно True
, а значение предыдущего pos_neg
равно False
и 0 в противном случае.
Во втором окне (называемом w2) мы вычисляем накопленную сумму, чтобы получить желаемое
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import Window
df = pd.DataFrame({"b": ['A','A','A','A','B', 'B','B','C','C','D','D', 'D','D','D','D','D','D','D','D','D'],"Sno": [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20],"a": [3,-4,2, -1, -3, -1,-7,-6, 1, 1, -1, 1,4,5,-3,2,3,4, -1, -2]})
df2=spark.createDataFrame(df)
w = Window.partitionBy('b').orderBy('Sno')
w2 = Window.partitionBy('b').rowsBetween(Window.unboundedPreceding, 0).orderBy('Sno')
df2 = df2.withColumn("pos_neg",col("a") <0)
df2 = df2.withColumn('val', F.when((df2.pos_neg == True) & (F.lag('pos_neg', default=False).over(w) == False), 1).otherwise(0))
df2 = df2.withColumn('val', F.sum('val').over(w2))
df2.show()
Выход:
+---+---+---+-------+---+
|Sno| a| b|pos_neg|val|
+---+---+---+-------+---+
| 5| -3| B| true| 1|
| 6| -1| B| true| 1|
| 7| -7| B| true| 1|
| 10| 1| D| false| 0|
| 11| -1| D| true| 1|
| 12| 1| D| false| 1|
| 13| 4| D| false| 1|
| 14| 5| D| false| 1|
| 15| -3| D| true| 2|
| 16| 2| D| false| 2|
| 17| 3| D| false| 2|
| 18| 4| D| false| 2|
| 19| -1| D| true| 3|
| 20| -2| D| true| 3|
| 8| -6| C| true| 1|
| 9| 1| C| false| 1|
| 1| 3| A| false| 0|
| 2| -4| A| true| 1|
| 3| 2| A| false| 1|
| 4| -1| A| true| 2|
+---+---+---+-------+---+
Вы можете задаться вопросом, почему был необходим столбец, позволяющий нам упорядочить набор данных. Позвольте мне попытаться объяснить это на примере. Следующие данные были прочитаны пандами и им был присвоен индекс (левая колонка). Вы хотите сосчитать вхождения True
в pos_neg
и не хотите считать последовательные True
. Эта логика приводит к столбцу val2
, как показано ниже:
b Sno a pos_neg val_2
0 A 1 3 False 0
1 A 2 -4 True 1
2 A 3 2 False 1
3 A 4 -1 True 2
4 A 5 -5 True 2
... но это зависит от индекса, полученного от панд (порядок строк). Когда вы измените порядок строк (и соответствующий индекс панд), вы получите другой результат, когда примените свою логику к одним и тем же строкам только потому, что порядок отличается:
b Sno a pos_neg val_2
0 A 1 3 False 0
1 A 3 2 False 0
2 A 2 -4 True 1
3 A 4 -1 True 1
4 A 5 -5 True 1
Вы видите, что порядок строк важен. Теперь вы можете удивиться, почему pyspark не создает индекс, как pandas. Это связано с тем, что spark хранит ваши данные в нескольких разделах, которые распределены по вашему кластеру, и зависит от вашего источника данных, даже способного считывать данные распределенным образом. Следовательно, индекс не может быть добавлен во время чтения данных. Вы можете добавить один после чтения данных с помощью функции monotonically_increasing_id , но ваши данные могут уже иметь другой порядок по сравнению с вашим источником данных из-за процесса чтения.
Ваш столбец sno
устраняет эту проблему и гарантирует, что вы получите всегда один и тот же результат для одних и тех же данных (детерминированный).