df.show()
#exampledataframe
+-----+-----+
|rowID|value|
+-----+-----+
| 1| 5|
| 2| 7|
| 3| 6|
| 4| 9|
| 5| 4|
| 6| 3|
+-----+-----+
Ваш требуемый лог c слишком динамичен c для оконных функций, поэтому мы должны go построчно обновлять наши значения . Одним из решений может быть использование обычного python udf
в собранном списке и затем взрыва после применения udf
. Если есть относительно небольшие данные, это должно быть хорошо ( spark2.4 только из-за arrays_zip
).
from pyspark.sql import functions as F
from pyspark.sql.types import *
def add_one(a):
for i in range(1,len(a)):
if a[i]>a[i-1]:
a[i]=a[i-1]
return a
udf1= F.udf(add_one, ArrayType(IntegerType()))
df.agg(F.collect_list("rowID").alias("rowID"),F.collect_list("value").alias("value"))\
.withColumn("value", udf1("value"))\
.withColumn("zipped", F.explode(F.arrays_zip("rowID","value"))).select("zipped.*").show()
+-----+-----+
|rowID|value|
+-----+-----+
| 1| 5|
| 2| 5|
| 3| 5|
| 4| 5|
| 5| 4|
| 6| 3|
+-----+-----+
UPDATE:
Еще лучше, поскольку у вас есть группы по 5000, использование Pandas vectorized udf( grouped MAP)
должно очень помочь с обработка. И вам не нужно собирать список с 5000 целыми числами и взорвать или использовать pivot . Я думаю, что это должно быть оптимальным решением. Pandas UDAF available for spark2.3+
GroupBy ниже пусто, но вы можете добавить в него столбец группировки.
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def grouped_map(df1):
for i in range(1, len(df1)):
if df1.loc[i, 'value']>df1.loc[i-1,'value']:
df1.loc[i,'value']=df1.loc[i-1,'value']
return df1
df.groupby().apply(grouped_map).show()
+-----+-----+
|rowID|value|
+-----+-----+
| 1| 5|
| 2| 5|
| 3| 5|
| 4| 5|
| 5| 4|
| 6| 3|
+-----+-----+