pyspark - как добавить новый столбец на основе условий текущей и предыдущей строки - PullRequest
0 голосов
/ 09 июля 2020

У меня есть таблица этого формата

date           dept            rate

2020-07-06     Marketing.       20 
2020-07-06     Sales.           15
2020-07-06     Engg.            40
2020-07-06     Sites.           18
2020-07-08     Sales.           5
2020-07-08     Engg.            10
2020-07-08     Sites.           7

Я хочу добавить новый столбец «SpendRate» таким образом, чтобы за последние два дня (7 и 8 июля в примере) скопировалось значения от 6 июля "rate" до "Spendrate" ..

date           dept            rate.       Spendrate

2020-07-06     Marketing.       20            20   
2020-07-06     Sales.           15            15
2020-07-06     Engg.            40            40
2020-07-06     Sites.           18            18 
2020-07-07     Marketing.       20.           20
2020-07-08     Sales.           5.            15
2020-07-08     Engg.            10            40
2020-07-08     Sites.           7             18

Ответы [ 2 ]

2 голосов
/ 09 июля 2020

Используйте window first(col,ignoreNulls=True) с предложением rangeBetween для создания фрейма.

Example:

df.show()
#+----------+---------+----+
#|      date|     dept|rate|
#+----------+---------+----+
#|2020-07-06|Marketing|  20|
#|2020-07-06|    Sales|  15|
#|2020-07-06|     Engg|  40|
#|2020-07-06|    sites|  18|
#|2020-07-08|    Sales|   5|
#|2020-07-08|     Engg|  10|
#|2020-07-08|    sites|   7|
#|2020-07-07|Marketing|  20|
#+----------+---------+----+

sql("select *, first(rate,True) over(partition by dept order by cast (date as timestamp) RANGE BETWEEN INTERVAL 2 DAYS PRECEDING AND CURRENT ROW) as Spendrate from tmp order by date").show()

#for more specific range by checking datediff -1 or 0 then generating Spendrate column. 
sql("select date,dept,rate,case when diff=-1 then first(rate,True) over(partition by dept order by cast (date as timestamp) RANGE BETWEEN INTERVAL 2 DAYS PRECEDING AND CURRENT ROW)  when diff=0 then first(rate,True) over(partition by dept order by cast (date as timestamp) RANGE BETWEEN INTERVAL 2 DAYS PRECEDING AND CURRENT ROW) else rate end as Spendrate from (select *,datediff(date,current_date)diff from tmp)t order by date").show()
#+----------+---------+----+----------+
#|      date|     dept|rate| Spendrate|
#+----------+---------+----+----------+
#|2020-07-06|Marketing|  20| 20       |
#|2020-07-06|    sites|  18| 18       |
#|2020-07-06|     Engg|  40| 40       |
#|2020-07-06|    Sales|  15| 15       |
#|2020-07-07|Marketing|  20| 20       |
#|2020-07-08|    sites|   7| 18       |
#|2020-07-08|     Engg|  10| 40       |
#|2020-07-08|    Sales|   5| 15       |
#+----------+---------+----+----------+
0 голосов
/ 09 июля 2020
from pyspark.sql import Window, WindowSpec
import pyspark.sql.functions as F
import pandas as pd

# Create the test data Assuming everyday, we have data for all departments
date = ['2020-07-06', '2020-07-06', '2020-07-06', '2020-07-06','2020-07-07','2020-07-07','2020-07-07','2020-07-07', '2020-07-08', '2020-07-08','2020-07-08','2020-07-08' ]
dept = ['Marketing', 'Sales', 'Engg', 'Sites','Marketing', 'Sales', 'Engg', 'Sites','Marketing', 'Sales', 'Engg', 'Sites',]
rate = [20,15,40,18,20, 3, 6, 9, 100,5,10,7]
df = pd.DataFrame([date, dept, rate]).T
df.columns = ['date', 'dept', 'rate']

# create spark DtaFrame
sdf = spark.createDataFrame(df)

sdf.show()

+----------+---------+----+
|      date|     dept|rate|
+----------+---------+----+
|2020-07-06|Marketing|  20|
|2020-07-06|    Sales|  15|
|2020-07-06|     Engg|  40|
|2020-07-06|    Sites|  18|
|2020-07-07|Marketing|  20|
|2020-07-07|    Sales|   3|
|2020-07-07|     Engg|   6|
|2020-07-07|    Sites|   9|
|2020-07-08|Marketing| 100|
|2020-07-08|    Sales|   5|
|2020-07-08|     Engg|  10|
|2020-07-08|    Sites|   7|
+----------+---------+----+

# Lag returns by one day
windowSpec = Window.partitionBy('dept').orderBy('date')
value_column = 'rate_shift'
value_ff = F.lag(sdf['rate'], offset=2).over(windowSpec)
sdf = sdf.withColumn(value_column, value_ff)
# returns = returns.withColumn(value_column, value_ff)

sdf.orderBy('date').show()

+----------+---------+----+----------+
|      date|     dept|rate|rate_shift|
+----------+---------+----+----------+
|2020-07-06|Marketing|  20|      null|
|2020-07-06|    Sales|  15|      null|
|2020-07-06|     Engg|  40|      null|
|2020-07-06|    Sites|  18|      null|
|2020-07-07|Marketing|  20|      null|
|2020-07-07|     Engg|   6|      null|
|2020-07-07|    Sales|   3|      null|
|2020-07-07|    Sites|   9|      null|
|2020-07-08|    Sales|   5|        15|
|2020-07-08|     Engg|  10|        40|
|2020-07-08|Marketing| 100|        20|
|2020-07-08|    Sites|   7|        18|
+----------+---------+----+----------+

rates are shiifted by 2 days
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...