В pyspark группируйте на основе поля переменной и добавьте счетчик для определенных значений (который сбрасывается при изменении переменной) - PullRequest
0 голосов
/ 10 июля 2019

Создание фрейма данных искры из фрейма данных pandas

import pandas as pd
df = pd.DataFrame({"b": ['A','A','A','A','B', 'B','B','C','C','D','D', 'D','D','D','D','D','D','D','D','D'],"Sno": [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20],"a": [3,-4,2, -1, -3, -1,-7,-6, 1, 1, -1, 1,4,5,-3,2,3,4, -1, -2]})

df2=spark.createDataFrame(df) 

Далее я использую оконный раздел в поле 'b'

from pyspark.sql import window
win_spec = (window.Window.partitionBy(['b']).orderBy("Sno").rowsBetween(window.Window.unboundedPreceding, 0))

Добавление поля положительное, отрицательное на основезначения и создал лямбда-функцию

df2 = df2.withColumn("pos_neg",col("a") <0)
pos_neg_func =udf(lambda x: ((x) & (x != x.shift())).cumsum())

попытался создать новый столбец (который является счетчиком для отрицательных значений, но в переменной 'b'. Таким образом, счетчик перезапускается при изменении поля в 'b'. Также, если естьявляются последовательными -ve значениями, они должны рассматриваться как одно значение, счетчик изменяется на 1

df3 = (df2.select('pos_neg',pos_neg_func('pos_neg').alias('val')))

Я хочу вывод как

      b  Sno  a    val  val_2
0   A    1  3  False      0
1   A    2 -4   True      1
2   A    3  2  False      1
3   A    4 -1   True      2
4   B    5 -3   True      1
5   B    6 -1   True      1
6   B    7 -7   True      1
7   C    8 -6   True      1
8   C    9  1  False      1
9   D   10  1  False      0
10  D   11 -1   True      1
11  D   12  1  False      1
12  D   13  4  False      1
13  D   14  5  False      1
14  D   15 -3   True      2
15  D   16  2  False      2
16  D   17  3  False      2
17  D   18  4  False      2
18  D   19 -1   True      3
19  D   20 -2   True      3

В Python простая функция, подобная следующейработает:

df['val'] = df.groupby('b')['pos_neg'].transform(lambda x: ((x) & (x != x.shift())).cumsum())

Джош-Фридлендер предоставил поддержку в приведенном выше коде

1 Ответ

1 голос
/ 10 июля 2019

Pyspark не имеет функции сдвига, но вы можете работать с оконной функцией lag , которая дает вам строку перед текущей строкой. Первое окно (называемое w) устанавливает значение столбца val равным 1, если значение столбца pos_neg равно True, а значение предыдущего pos_neg равно False и 0 в противном случае. Во втором окне (называемом w2) мы вычисляем накопленную сумму, чтобы получить желаемое

import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import Window

df = pd.DataFrame({"b": ['A','A','A','A','B', 'B','B','C','C','D','D', 'D','D','D','D','D','D','D','D','D'],"Sno": [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20],"a": [3,-4,2, -1, -3, -1,-7,-6, 1, 1, -1, 1,4,5,-3,2,3,4, -1, -2]})

df2=spark.createDataFrame(df) 

w = Window.partitionBy('b').orderBy('Sno')
w2 = Window.partitionBy('b').rowsBetween(Window.unboundedPreceding, 0).orderBy('Sno')

df2 = df2.withColumn("pos_neg",col("a") <0)

df2 = df2.withColumn('val', F.when((df2.pos_neg == True) & (F.lag('pos_neg', default=False).over(w) == False), 1).otherwise(0))
df2 = df2.withColumn('val',  F.sum('val').over(w2))

df2.show()

Выход:

+---+---+---+-------+---+ 
|Sno|  a|  b|pos_neg|val| 
+---+---+---+-------+---+ 
|  5| -3|  B|   true|  1| 
|  6| -1|  B|   true|  1| 
|  7| -7|  B|   true|  1| 
| 10|  1|  D|  false|  0| 
| 11| -1|  D|   true|  1| 
| 12|  1|  D|  false|  1| 
| 13|  4|  D|  false|  1| 
| 14|  5|  D|  false|  1| 
| 15| -3|  D|   true|  2| 
| 16|  2|  D|  false|  2| 
| 17|  3|  D|  false|  2| 
| 18|  4|  D|  false|  2| 
| 19| -1|  D|   true|  3| 
| 20| -2|  D|   true|  3| 
|  8| -6|  C|   true|  1| 
|  9|  1|  C|  false|  1| 
|  1|  3|  A|  false|  0| 
|  2| -4|  A|   true|  1| 
|  3|  2|  A|  false|  1| 
|  4| -1|  A|   true|  2| 
+---+---+---+-------+---+

Вы можете задаться вопросом, почему был необходим столбец, позволяющий нам упорядочить набор данных. Позвольте мне попытаться объяснить это на примере. Следующие данные были прочитаны пандами и им был присвоен индекс (левая колонка). Вы хотите сосчитать вхождения True в pos_neg и не хотите считать последовательные True. Эта логика приводит к столбцу val2, как показано ниже:

    b  Sno  a   pos_neg  val_2
0   A    1  3  False      0
1   A    2 -4   True      1
2   A    3  2  False      1
3   A    4 -1   True      2
4   A    5 -5   True      2

... но это зависит от индекса, полученного от панд (порядок строк). Когда вы измените порядок строк (и соответствующий индекс панд), вы получите другой результат, когда примените свою логику к одним и тем же строкам только потому, что порядок отличается:

    b  Sno  a   pos_neg  val_2
0   A    1  3  False      0
1   A    3  2  False      0
2   A    2 -4   True      1
3   A    4 -1   True      1
4   A    5 -5   True      1

Вы видите, что порядок строк важен. Теперь вы можете удивиться, почему pyspark не создает индекс, как pandas. Это связано с тем, что spark хранит ваши данные в нескольких разделах, которые распределены по вашему кластеру, и зависит от вашего источника данных, даже способного считывать данные распределенным образом. Следовательно, индекс не может быть добавлен во время чтения данных. Вы можете добавить один после чтения данных с помощью функции monotonically_increasing_id , но ваши данные могут уже иметь другой порядок по сравнению с вашим источником данных из-за процесса чтения.

Ваш столбец sno устраняет эту проблему и гарантирует, что вы получите всегда один и тот же результат для одних и тех же данных (детерминированный).

...