Я работаю с искрой DataFrame
, где я хотел бы обнаружить любое значение из определенного столбца c, где значение не монотонно уменьшается. Для этих значений я хотел бы заменить их предыдущим значением в соответствии с критериями упорядочения.
Вот концептуальный пример, если у меня есть столбец значения [65, 66, 62, 100, 40]
. значение «100» не следует монотонному тренду уменьшения c и поэтому должно быть заменено на 62. Таким образом, результирующий список будет [65, 66, 62, 62, 40]
.
Ниже приведен код, который я создал для определения значения это должно быть заменено, однако я не знаю, как заменить значение предыдущим, а также как игнорировать начальное значение null
из lag
.
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as psf
from pyspark.sql.window import Window
sc = SparkContext(appName="sample-app")
sqlc = SQLContext(sc)
rdd = sc.parallelize([(1, 65), (2, 66), (3, 62), (4, 100), (5, 40)])
df = sqlc.createDataFrame(rdd, ["id", "value"])
window = Window.orderBy(df.id).rowsBetween(-1, -1)
sdf = df.withColumn(
"__monotonic_col",
(df.value <= psf.lag(df.value, 1).over(window)) & df.value.isNotNull(),
)
sdf.show()
Этот код производит следующий вывод :
+---+-----+---------------+
| id|value|__monotonic_col|
+---+-----+---------------+
| 1| 65| null|
| 2| 66| false|
| 3| 62| true|
| 4| 100| false|
| 5| 40| true|
+---+-----+---------------+