Я думаю, будет сложно сделать это полностью без зацикливания. Но вы можете разделить работу между разными исполнителями и подмножествами в пандах с помощью udf. Чтобы это работало, должно быть достаточно точек останова (то есть точек данных, где значение меньше 0, и вы вставляете NULL).
Импорт:
from pyspark.sql import Window
from pyspark.sql.functions import last
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
import pandas as pd
import numpy as np
from pyspark.sql.functions import col, lit, when
Входные данные:
df = spark.createDataFrame([[ 1, 7000.0, 30.0 ], [ 2, 0.0, 9.0], [3, 23628.0, 17.0], [4, 8373.0, 23.0], [5, -0.5, 4.0]], [ 'id', 'value1', 'value2' ]).cache()
Добавление следующего значения2 и установка точек останова, когда значение меньше 0:
dfwithnextvalue = df.alias("a").join(df.alias("b"), col("a.id") == col("b.id") - lit(1), 'left').select("a.*", col("b.value2").alias("nextvalue"))
dfstartnew = dfwithnextvalue.withColumn("startnew", when(col("value1") < lit(0), col("id")).otherwise(lit(None)))\
.withColumn("startnew", when(col("id") == lit(1), lit(1)).otherwise(col("startnew")))
window = Window.orderBy('id')
rolled = last(col('startnew'), ignorenulls=True).over(window)
dfstartnewrolled = dfstartnew.withColumn("startnew", rolled)
Теперь мы можем группировать по столбцу startnew
и обрабатывать каждую фигуру в пандах. Мои знания панд не велики, но, похоже, это работает:
@pandas_udf("id long, value1 double, value2 double", PandasUDFType.GROUPED_MAP)
def loopdata(df):
df = df.set_index('id').sort_index()
for i in range(0, len(df.index)):
if i == 0:
df.loc[df.index[0], 'value2'] = np.nan
elif df.loc[df.index[i], 'value1'] < 0:
df.loc[df.index[i], 'value2'] = np.nan
elif df.loc[df.index[i], 'value1'] > 0:
df.loc[df.index[i], 'value2'] = df.loc[df.index[i-1], 'value2']
else:
nextvalue = df.loc[df.index[i], 'nextvalue']
if pd.isna(nextvalue):
nextvalue = 0
prevvalue = df.loc[df.index[i-1], 'value2']
if pd.isna(prevvalue):
prevvalue = 0
df.loc[df.index[i], 'value2'] = (nextvalue + prevvalue)/2.0
df = df.drop(columns=['nextvalue', 'startnew'])
df = df.reset_index()
return df
Теперь вы можете вычислить результат:
dfstartnewrolled.groupBy("startnew").apply(loopdata)