Как получить значение последней строки, когда флаг равен 0, и получить текущее значение строки в новый столбец, когда флаг 1 в фрейме данных pyspark - PullRequest
1 голос
/ 07 мая 2020

Сценарий 1, когда Флаг 1: для строки, где Флаг равен 1, Скопируйте trx_date в Назначение

Сценарий 2, Когда Флаг 0: Для строки, где Флаг равен 0, Скопируйте предыдущее Целевое значение

Ввод:

+-----------+----+----------+
|customer_id|Flag|  trx_date|
+-----------+----+----------+
|          1|   1| 12/3/2020|
|          1|   0| 12/4/2020|
|          1|   1| 12/5/2020|
|          1|   1| 12/6/2020|
|          1|   0| 12/7/2020|
|          1|   1| 12/8/2020|
|          1|   0| 12/9/2020|
|          1|   0|12/10/2020|
|          1|   0|12/11/2020|
|          1|   1|12/12/2020|
|          2|   1| 12/1/2020|
|          2|   0| 12/2/2020|
|          2|   0| 12/3/2020|
|          2|   1| 12/4/2020|
+-----------+----+----------+

Выход:

+-----------+----+----------+-----------+
|customer_id|Flag|  trx_date|destination|
+-----------+----+----------+-----------+
|          1|   1| 12/3/2020|  12/3/2020|
|          1|   0| 12/4/2020|  12/3/2020|
|          1|   1| 12/5/2020|  12/5/2020|
|          1|   1| 12/6/2020|  12/6/2020|
|          1|   0| 12/7/2020|  12/6/2020|
|          1|   1| 12/8/2020|  12/8/2020|
|          1|   0| 12/9/2020|  12/8/2020|
|          1|   0|12/10/2020|  12/8/2020|
|          1|   0|12/11/2020|  12/8/2020|
|          1|   1|12/12/2020| 12/12/2020|
|          2|   1| 12/1/2020|  12/1/2020|
|          2|   0| 12/2/2020|  12/1/2020|
|          2|   0| 12/3/2020|  12/1/2020|
|          2|   1| 12/4/2020|  12/4/2020|
+-----------+----+----------+-----------+

Код для генерации искры Dataframe:

df = spark.createDataFrame([(1,1,'12/3/2020'),(1,0,'12/4/2020'),(1,1,'12/5/2020'),
(1,1,'12/6/2020'),(1,0,'12/7/2020'),(1,1,'12/8/2020'),(1,0,'12/9/2020'),(1,0,'12/10/2020'),
(1,0,'12/11/2020'),(1,1,'12/12/2020'),(2,1,'12/1/2020'),(2,0,'12/2/2020'),(2,0,'12/3/2020'),
(2,1,'12/4/2020')], ["customer_id","Flag","trx_date"])

Ответы [ 3 ]

2 голосов
/ 08 мая 2020

Pyspark способ сделать это. После получения trx_date в datetype, сначала получите incremental sum из Flag, чтобы создать groupings нам нужно, чтобы использовать функцию first в окне partitioned by those groupings. Мы можем использовать date_format, чтобы вернуть оба столбца в желаемый формат даты. Я предполагал, что ваш формат был MM/dd/yyyy, если он был другим, измените его на dd/MM/yyyy в коде.

df.show() #sample data
#+-----------+----+----------+
#|customer_id|Flag|  trx_date|
#+-----------+----+----------+
#|          1|   1| 12/3/2020|
#|          1|   0| 12/4/2020|
#|          1|   1| 12/5/2020|
#|          1|   1| 12/6/2020|
#|          1|   0| 12/7/2020|
#|          1|   1| 12/8/2020|
#|          1|   0| 12/9/2020|
#|          1|   0|12/10/2020|
#|          1|   0|12/11/2020|
#|          1|   1|12/12/2020|
#|          2|   1| 12/1/2020|
#|          2|   0| 12/2/2020|
#|          2|   0| 12/3/2020|
#|          2|   1| 12/4/2020|
#+-----------+----+----------+

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().orderBy("customer_id","trx_date")
w1=Window().partitionBy("Flag2").orderBy("trx_date").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("trx_date", F.to_date("trx_date", "MM/dd/yyyy"))\
  .withColumn("Flag2", F.sum("Flag").over(w))\
  .withColumn("destination", F.when(F.col("Flag")==0, F.first("trx_date").over(w1)).otherwise(F.col("trx_date")))\
  .withColumn("trx_date", F.date_format("trx_date","MM/dd/yyyy"))\
  .withColumn("destination", F.date_format("destination", "MM/dd/yyyy"))\
  .orderBy("customer_id","trx_date").drop("Flag2").show()

#+-----------+----+----------+-----------+
#|customer_id|Flag|  trx_date|destination|
#+-----------+----+----------+-----------+
#|          1|   1|12/03/2020| 12/03/2020|
#|          1|   0|12/04/2020| 12/03/2020|
#|          1|   1|12/05/2020| 12/05/2020|
#|          1|   1|12/06/2020| 12/06/2020|
#|          1|   0|12/07/2020| 12/06/2020|
#|          1|   1|12/08/2020| 12/08/2020|
#|          1|   0|12/09/2020| 12/08/2020|
#|          1|   0|12/10/2020| 12/08/2020|
#|          1|   0|12/11/2020| 12/08/2020|
#|          1|   1|12/12/2020| 12/12/2020|
#|          2|   1|12/01/2020| 12/01/2020|
#|          2|   0|12/02/2020| 12/01/2020|
#|          2|   0|12/03/2020| 12/01/2020|
#|          2|   1|12/04/2020| 12/04/2020|
#+-----------+----+----------+-----------+
2 голосов
/ 07 мая 2020

Вы можете использовать оконные функции. Я не уверен, поддерживает ли Spark sql стандартный параметр ignore nulls для lag().

Если это так, вы можете просто сделать:

select 
    t.*,
    case when flag = 1
        then trx_date
        else lag(case when flag = 1 then trx_date end ignore nulls) 
                over(partition by customer_id order by trx_date)
    end destination
from mytable t

В противном случае вы можете создавать группы с суммой окна сначала:

select
    customer_id,
    flag,
    trx_date,
    case when flag = 1
        then trx_date
        else min(trx_date) over(partition by customer_id, grp order by trx_date)
    end destination
from (
    select t.*, sum(flag) over(partition by customer_id order by trx_date) grp
    from mytable t
) t
1 голос
/ 08 мая 2020

Вы можете добиться этого следующим образом, если рассматриваете API фрейма данных

#Convert date format while creating window itself

window = Window().orderBy("customer_id",f.to_date('trx_date','MM/dd/yyyy'))

df1 = df.withColumn('destination', f.when(f.col('Flag')==1,f.col('trx_date'))).\
withColumn('destination',f.last(f.col('destination'),ignorenulls=True).over(window))

df1.show()

+-----------+----+----------+-----------+
|customer_id|Flag|  trx_date|destination|
+-----------+----+----------+-----------+
|          1|   1| 12/3/2020|  12/3/2020|
|          1|   0| 12/4/2020|  12/3/2020|
|          1|   1| 12/5/2020|  12/5/2020|
|          1|   1| 12/6/2020|  12/6/2020|
|          1|   0| 12/7/2020|  12/6/2020|
|          1|   1| 12/8/2020|  12/8/2020|
|          1|   0| 12/9/2020|  12/8/2020|
|          1|   0|12/10/2020|  12/8/2020|
|          1|   0|12/11/2020|  12/8/2020|
|          1|   1|12/12/2020| 12/12/2020|
|          2|   1| 12/1/2020|  12/1/2020|
|          2|   0| 12/2/2020|  12/1/2020|
|          2|   0| 12/3/2020|  12/1/2020|
|          2|   1| 12/4/2020|  12/4/2020|
+-----------+----+----------+-----------+

Надеюсь, это поможет.

...