Добавить условие для функции last () в pyspark sql при использовании окном / разделом с прямой заливкой - PullRequest
1 голос
/ 27 апреля 2020

Код, который я получаю, находится по этой ссылке: https://johnpaton.net/posts/forward-fill-spark/ Он содержит некоторые сведения о задачах, которые я хочу выполнить sh.

from pyspark.sql import Window
from pyspark.sql.functions import last

# define the window
window = Window.partitionBy('location')\
               .orderBy('time')\
               .rowsBetween(-sys.maxsize, 0)

# define the forward-filled column
filled_column = last(spark_df['temperature'], ignorenulls=True).over(window)

# do the fill
spark_df_filled = spark_df.withColumn('temp_filled_spark', filled_column)

По существу, last() функция используется, чтобы найти статус последнего ненулевого значения. Если все значения равны NULL, возвращается значение NULL.

Однако я хотел бы назначить значение по умолчанию, если все столбцы в этой группе имеют значение NULL. Я пробовал разные способы, но не смог понять.

Так что, в принципе, если для местоположения все температуры равны нулю, я хотел бы иметь способ установить его со значением по умолчанию.

Some examples:
I want to fill them with default values for the case below:

location  temp                temp
1         null                0
1         null      =====>    0
1         null                0

I do not want to fill them with default values for the case below:

location  temp                 temp
1         null                 null
1          50      ======>      50
1          60                   60

1 Ответ

0 голосов
/ 27 апреля 2020

Возможно, вы можете определить другой столбец, который будет служить индикатором, если какая-либо запись в данном месте содержит ненулевое значение. Например:

window_2 = Window.partitionBy('location').rowsBetween(-sys.maxsize, sys.maxsize)
max_column = max(spark_df['temperature']).over(window_2)

Затем используйте этот столбец вместе с вашим filled_column для условного заполнения окончательного результата:

temp_filled_spark = when(max_column.isNull(),0).otherwise(filled_column)
spark_df_filled = spark_df.withColumn('temp_filled_spark', temp_filled_spark)

Возможно, не очень элегантный или суперэффективный, но должно работать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...