Как использовать функцию искрового окна в качестве каскадных изменений предыдущего ряда в следующий ряд - PullRequest
1 голос
/ 02 апреля 2020

Я пытался использовать оконную функцию для вычисления текущего значения на основе предыдущего значения в динаме c way

    rowID | value
------------------
     1    | 5
     2    | 7
     3    | 6

Logi c:

If value > pre_value then value

Так в строке 2 , так как 7> 5, то value становится 5. Конечный результат должен быть

    rowID | value
------------------
     1    | 5
     2    | 5
     3    | 5

Однако использование lag().over(w) дало результат как

    rowID | value
------------------
     1    | 5
     2    | 5
     3    | 6

, оно сравнивает значение третьей строки 6 против "7" не новое значение "5"

Есть предложения, как этого добиться?

1 Ответ

1 голос
/ 03 апреля 2020
df.show()
#exampledataframe
+-----+-----+
|rowID|value|
+-----+-----+
|    1|    5|
|    2|    7|
|    3|    6|
|    4|    9|
|    5|    4|
|    6|    3|
+-----+-----+

Ваш требуемый лог c слишком динамичен c для оконных функций, поэтому мы должны go построчно обновлять наши значения . Одним из решений может быть использование обычного python udf в собранном списке и затем взрыва после применения udf . Если есть относительно небольшие данные, это должно быть хорошо ( spark2.4 только из-за arrays_zip).

from pyspark.sql import functions as F
from pyspark.sql.types import *
def add_one(a):
    for i in range(1,len(a)):
       if a[i]>a[i-1]:
           a[i]=a[i-1]
    return a
udf1= F.udf(add_one, ArrayType(IntegerType()))
df.agg(F.collect_list("rowID").alias("rowID"),F.collect_list("value").alias("value"))\
  .withColumn("value", udf1("value"))\
  .withColumn("zipped", F.explode(F.arrays_zip("rowID","value"))).select("zipped.*").show()

+-----+-----+
|rowID|value|
+-----+-----+
|    1|    5|
|    2|    5|
|    3|    5|
|    4|    5|
|    5|    4|
|    6|    3|
+-----+-----+

UPDATE:

Еще лучше, поскольку у вас есть группы по 5000, использование Pandas vectorized udf( grouped MAP) должно очень помочь с обработка. И вам не нужно собирать список с 5000 целыми числами и взорвать или использовать pivot . Я думаю, что это должно быть оптимальным решением. Pandas UDAF available for spark2.3+

GroupBy ниже пусто, но вы можете добавить в него столбец группировки.

from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def grouped_map(df1):
   for i in range(1, len(df1)):
        if df1.loc[i, 'value']>df1.loc[i-1,'value']:
           df1.loc[i,'value']=df1.loc[i-1,'value']

   return df1
df.groupby().apply(grouped_map).show()

+-----+-----+
|rowID|value|
+-----+-----+
|    1|    5|
|    2|    5|
|    3|    5|
|    4|    5|
|    5|    4|
|    6|    3|
+-----+-----+ 
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...