Как обнаружить монотонное c снижение в pyspark - PullRequest
1 голос
/ 14 февраля 2020

Я работаю с искрой DataFrame, где я хотел бы обнаружить любое значение из определенного столбца c, где значение не монотонно уменьшается. Для этих значений я хотел бы заменить их предыдущим значением в соответствии с критериями упорядочения.

Вот концептуальный пример, если у меня есть столбец значения [65, 66, 62, 100, 40]. значение «100» не следует монотонному тренду уменьшения c и поэтому должно быть заменено на 62. Таким образом, результирующий список будет [65, 66, 62, 62, 40].

Ниже приведен код, который я создал для определения значения это должно быть заменено, однако я не знаю, как заменить значение предыдущим, а также как игнорировать начальное значение null из lag.

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as psf
from pyspark.sql.window import Window

sc = SparkContext(appName="sample-app")
sqlc = SQLContext(sc)

rdd = sc.parallelize([(1, 65), (2, 66), (3, 62), (4, 100), (5, 40)])
df = sqlc.createDataFrame(rdd, ["id", "value"])

window = Window.orderBy(df.id).rowsBetween(-1, -1)
sdf = df.withColumn(
    "__monotonic_col",
    (df.value <= psf.lag(df.value, 1).over(window)) & df.value.isNotNull(),
)


sdf.show()

Этот код производит следующий вывод :

+---+-----+---------------+
| id|value|__monotonic_col|
+---+-----+---------------+
|  1|   65|           null|
|  2|   66|          false|
|  3|   62|           true|
|  4|  100|          false|
|  5|   40|           true|
+---+-----+---------------+

1 Ответ

1 голос
/ 14 февраля 2020

Во-первых, если мое понимание верно, не следует ли заменить 66 (на 65), поскольку оно не следует тенденции к снижению?

Если это правильная интерпретация, то должно работать следующее (я добавил дополнительный столбец, чтобы сохранить порядок, но вы можете заключить все в один оператор создания столбца):

from pyspark.sql import functions as F

sdf = sdf.withColumn(
    "__monotonic_col_value",
    F.when(
        F.col("__monotonic_col")  | F.col("__monotonic_col").isNull(), df.value)
    .otherwise(
        psf.lag(df.value, 1).over(window)
    ),
)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...