Найти предыдущее уникальное значение в Dataframe - Pyspark - PullRequest
0 голосов
/ 30 августа 2018

У меня есть датафрейм с несколькими продуктами для каждой даты по клиенту. В новой колонке я пытаюсь получить предыдущую уникальную дату по клиенту.

Cst Prod    Dt  Desired Output
C1  P1  1-Jan-16    0
C1  P2  1-Jan-16    0
C1  P3  1-Jan-16    0
C1  P4  1-Jan-16    0
C1  P1  20-Jan-16   1-Jan-16
C1  P2  20-Jan-16   1-Jan-16
C2  P2  5-Feb-17    0
C2  P3  5-Feb-17    0
C2  P4  5-Feb-17    0
C2  P1  30-Mar-17   5-Feb-17

Я только начинаю с PySpark. До сих пор я пытался создать столбец массива дат (CUM_DATE) для каждого клиента, а затем применить UDF, чтобы получить все даты, кроме одной в строке, а затем взять максимум столбца массива.

Что-то на линиях -

def filter_currdate(arr, dt):
    return [x for x in arr if x not in dt]

filter_currdate_udf = F.udf(lambda x: filter_code(x), ArrayType(DateType()))

df = df.withColumn('except_date', filter_currdate_udf(df['CUM_DATE'], df['Dt']))
df = df.withColumn('max_prev_date',F.max(df['except_date']))

Но он работает с ошибкой, и я не могу найти лучший способ получить этот вывод.

1 Ответ

0 голосов
/ 30 августа 2018

Есть другой способ без пользовательских функций UDF. Допустим, у df есть столбцы cst, prod, dt:

from pyspark.sql.functions import max
df.alias('df1').join(df.alias('df2'), 
( 
   col('df1.cst')==col('df2.cst') 
 & col('df1.prod') == col('df2.prod')
 & col('df1.dt') > col('df2.dt'),
 how='left_outer'
).select('df1.*', 'df2.dt')
.groupBy('df1.cst', 'df1.prod', 'df1.dt')
.agg(max('df2.dt'))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...