В pyspark сгенерируйте минимальное значение для оконного раздела на основе значения двух столбцов, переменных и последовательных отрицательных значений - PullRequest
0 голосов
/ 11 июля 2019

Создает rdd, имеющий столбец 'a', который имеет сочетание положительных и отрицательных значений

df = pd.DataFrame({"b": ['A','A','A','A','B', 'B','B','C','C','D','D', 'D','D','D','D','D','D','D','D','D'],
                   "Sno": [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20],
                   "a": [3,-4,2, -1, -3, -1,-7,-6, 1, 1, -1, 1,4,5,-3,2,3,4, -1, -2],
                   "pos_neg": ['false','true','false','true','true','true','true','true','false','false','true','false','false','false','true','false','false','false','true','true'],
                   "neg_val_count":[0,1,1,2,1,1,1,1,1,0,1,1,1,1,2,2,2,2,3,3]})

df2=spark.createDataFrame(df) 

столбец pos_neg представляет, если поля в 'a' положительны или отрицательны, если отрицательны, то этоправда.'neg_val_count' является счетчиком отрицательных значений в полях для переменной 'b'.Каждый раз, когда переменная «b» меняет счетчик, он сбрасывается, и последовательные отрицательные значения принимаются как одиночные.Следовательно, для переменной 'B' (в столбце 'b') счетчик равен единице, хотя есть три отрицательных значения.

Я хотел бы создать столбец, который будет иметь минимальное значение для комбинации переменных в 'b '(скажем, A) и значение в' a '(для истинных случаев между двумя ложными).например, для первой комбинации «A» и «истина» значение будет равно -4 (оно заключено в «ложь»), для второй комбинации «A» и значение «истина» будет равно -1, для B существует три последовательных значения «истина», поэтомузначение будет наименьшим из них как -7.В основном последовательные отрицательные значения принимаются как единое целое, а минимальное значение вычитается из них.Ожидаемое значение относится к требуемому результату

    b  Sno  a pos_neg  neg_val_count   expected value
0   A    1  3   false              0        3
1   A    2 -4    true              1       -4
2   A    3  2   false              1        2
3   A    4 -1    true              2       -1
4   B    5 -3    true              1       -7
5   B    6 -1    true              1       -7
6   B    7 -7    true              1       -7
7   C    8 -6    true              1       -6
8   C    9  1   false              1        1
9   D   10  1   false              0        1
10  D   11 -1    true              1       -1
11  D   12  1   false              1        1
12  D   13  4   false              1        4
13  D   14  5   false              1        5
14  D   15 -3    true              2       -3
15  D   16  2   false              2        2
16  D   17  3   false              2        3
17  D   18  4   false              2        4
18  D   19 -1    true              3       -2
19  D   20 -2    true              3       -2

Я пытался использовать следующее, но это не работает, любая поддержка в этом отношении будет отличной.

w3 = Window.partitionBy('b','pos_neg').rowsBetween(Window.unboundedPreceding, 0).orderBy('Sno')

df2.withColumn('new_col', F.min('a').over(w3))
...