Я пытаюсь вычислить значение действия, которое не было получено из дополнительного зачисления.
Ввод:
+------+--------+------+
|period|activity|credit|
+------+--------+------+
| 1| 5| 0|
| 2| 0| 3|
| 3| 4| 0|
| 4| 0| 3|
| 5| 1| 0|
| 6| 1| 0|
| 7| 5| 0|
| 8| 0| 1|
| 9| 0| 1|
| 10| 5| 0|
+------+--------+------+
Выход:
rdd = sc.parallelize([(5,0,5),(0,3,0),(4,0,1),(0,3,0),(1,0,0),(1,0,0),(5,0,4),(0,1,0),(0,1,0),(5,0,3)])
df = rdd.toDF(["activity","credit","realActivity"])
+------+--------+------+------------+
|period|activity|credit|realActivity|
+------+--------+------+------------+
| 1| 5| 0| 5|
| 2| 0| 3| 0|
| 3| 4| 0| 1|
| 4| 0| 3| 0|
| 5| 1| 0| 0|
| 6| 1| 0| 0|
| 7| 5| 0| 4|
| 8| 0| 1| 0|
| 9| 0| 1| 0|
| 10| 5| 0| 3|
+------+--------+------+------------+
Я пытался создатьстолбец кредитного баланса, который добавляет и вычитает в зависимости от типа строки, но я не могу перезапустить его условно (каждый раз, когда он опускается ниже нуля) в зависимости от самого себя.Это похоже на рекурсивную проблему, которую я не уверен, как превратить в дружественный pyspark.Очевидно, что я не могу сделать следующее, самостоятельно ссылаясь на предыдущее значение ..
w = Window.orderBy("period")
df = df.withColumn("realActivity", lag("realActivity",1,0).over(w) - lag("credit", 1, 0).over(w) - lag("activity",1,0).over(w) )
Обновление: Как было отмечено, это невозможно при расчете окна.Поэтому я хотел бы сделать что-то вроде приведенного ниже фрагмента для расчета creditBalance, который позволил бы мне вычислить realActivity.
df['creditBalance']=0
for i in range(1, len(df)):
if (df.loc[i-1, 'creditBalance']) > 0:
df.loc[i, 'creditBalance'] = df.loc[i-1, 'creditBalance'] + df.loc[i, 'credit'] - df.loc[i, 'activity']
elif df.loc[i, 'creditamount'] > 0:
df.loc[i, 'creditBalance'] = df.loc[i, 'credit'] - df.loc[i, 'activity']
Теперь мой единственный вопрос: как я могу применить эту "локальную" функцию к каждой группе в кадре данных spark?
- записать кадр данных в файлы по группам и процессамлокально?
- настраивать карту и собирать строки для локального выполнения?
- сворачивать строки в одну строку по группе и обрабатывать это?
- что-нибудь еще?