Добавление нового столбца в Spark DataFrame на основе опережений и задержек - PullRequest
0 голосов
/ 28 января 2020

У меня проблемы с созданием нового столбца для моего Spark DataFrame. Для иллюстрации у меня есть таблица с 3 столбцами (features, predictions, leadCol). Новый столбец примет значение столбца prediction, если "prediction" != "leadCol" and if on any row "prediction" === "leadCol", новый столбец должен добавить среднее значение prediction и предыдущее значение в новом столбце.

Возможно ли это сделать?

Я пробовал это, что не совсем работает:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val win_spec = Window.orderBy("features")

// create the new column here with only 1 as value
results = results.withColumn("final_preds", lit(1))

// function to create the new value
def func = when(col("prediction") === col("leadCol"), 
                      ((col("prediction") + lag(col("final_preds"), 1).over(win_spec)) / 2)).otherwise(col("prediction"))

results = results.withColumn("final_preds", func)

Пример ошибочного вывода это дает:

enter image description here

...