Функция PySpark UDF с запросом фрейма данных? - PullRequest
0 голосов
/ 02 февраля 2019

У меня есть другое решение, но я предпочитаю использовать PySpark 2.3 для этого.

У меня есть двумерный фрейм данных PySpark, подобный этому:

Date       | ID
---------- | ----
08/31/2018 | 10
09/31/2018 | 10
09/01/2018 | null
09/01/2018 | null
09/01/2018 | 12

Я хотел заменить ID нулевые значения путем поиска ближайшего значения в прошлом или, если это значение равно нулю, путем просмотра в будущее (и, если оно снова равно нулю, установите значение по умолчанию)

Я представлял себе добавление нового столбца с.withColumn и использовать функцию UDF, которая будет запрашивать сам фрейм данных.

Что-то подобное в псевдокоде (не идеально, но это основная идея):

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def return_value(value,date):

    if value is not null:
        return val

    value1 = df.filter(df['date']<= date).select(df['value']).collect()

    if (value1)[0][0] is not null:
        return (value1)[0][0]

    value2 = df.filter(tdf['date']>= date).select(df['value']).collect()
        return (value2)[0][0]


value_udf = udf(return_value,StringType())
new_df = tr.withColumn("new_value", value_udf(df.value,df.date))

Но этоне работает.Я совершенно не на том пути?Можно ли только запросить фрейм данных Spark в функции UDF?Я пропустил более простое решение?

1 Ответ

0 голосов
/ 03 февраля 2019

Создайте новый фрейм данных с одним столбцом - уникальный список всех дат:

datesDF = yourDF.select('Date').distinct()

Создайте еще один, который будет состоять из дат и идентификаторов, но только тех, в которых нет нулей.А также давайте сохраним только первое (каким бы ни было первое) вхождение идентификатора для каждой даты (судя по вашему примеру, вы можете иметь несколько строк на дату)

noNullsDF = yourDF.dropna().dropDuplicates(subset='Date')

Теперь давайте объединяем эти две, чтобы у нас был списоквсех дат с любым значением, которое у нас есть для него (или ноль)

joinedDF = datesDF.join(noNullsDF, 'Date', 'left')

Теперь для каждой даты получите значение идентификатора из предыдущей и следующей дат, используя оконные функции, а также давайте переименуем наш столбец идентификаторов, чтобы позжепроблем с присоединением будет меньше:

from pyspark.sql.window import Window
from pyspark.sql import functions as f
w = Window.orderBy('Date')

joinedDF = joinedDF.withColumn('previousID',f.lag('ID').over(w)) 
                   .withColumn('nextID',f.lead('ID').over(w))
                   .withColumnRenamed('ID','newID') 

Теперь давайте присоединим его к нашему исходному фрейму данных по дате

yourDF = yourDF.join(joinedDF, 'Date', 'left')

Теперь наш фрейм данных имеет 4 столбца идентификатора:

  1. исходный идентификатор
  2. newID - идентификатор любого ненулевого значения заданной даты, если оно есть, или нулевое
  3. previousID - идентификатор с предыдущей даты (не равно нулю, если есть, или ноль)
  4. nextID - идентификатор со следующей даты (не ноль, если есть или ноль)

Теперь нам нужно объединить их в finalID по порядку:

  1. исходное значение, еслине нулевое
  2. значение для текущей даты, если естьприсутствует ненулевое значение (это противоречит вашему вопросу, но код панды предлагает вам перейти <= при проверке даты), если результат не равен нулю </li>
  3. значение для предыдущей даты, если оно не равно нулю
  4. значение дляследующая дата, если она не равна нулю
  5. какое-то значение по умолчанию

Мы делаем это просто путем объединения:

default = 0
finalDF = yourDF.select('Date', 
                        'ID',
                        f.coalesce('ID',
                                   'newID',
                                   'previousID',
                                   'nextID',
                                   f.lit(default)).alias('finalID')
                       )
...