Найти последовательное снижение total_sale на spark sql - PullRequest
0 голосов
/ 31 октября 2019

У меня есть данные в таблице / Dataframe.

table/dataframe: temptable/temp_df 
StoreId,Total_Sales,Date
S1,10000,01-Jan-18
S1,20000,02-Jan-18
S1,25000,03-Jan-18
S1,30000,04-Jan-18
S1,29000,05-Jan-18--> total sales value is decline from previous value(04-jan-18)
S1,28500,06-Jan-18--> total sales value is decline from previous value(05-jan-18)
S1,25500,07-Jan-18--> total sales value is decline from previous value(06-jan-18)(output row)
S1,25500,08-Jan-18--> total sales value is constant from previous value(07-jan-18)
S1,30000,09-Jan-18
S1,29000,10-Jan-18-->same
S1,28000,11-Jan-18-->same
S1,25000,12-Jan-18-->same (output row)
S1,25000,13-Jan-18
S1,30000,14-Jan-18
S1,29000,15-Jan-18 
S1,28000,16-Jan-18 

, поэтому я хочу те записи из фрейма данных / таблицы, которые отклоняются подряд 3 раза. если итоговое значение имеет тот же total_sale, то оно не будет считаться ни уменьшением, ни увеличением.

Ожидаемый результат:

StoreId,Total_Sales,Date
S1,25500,07-Jan-18
S1,25000,12-Jan-18

1 Ответ

0 голосов
/ 31 октября 2019
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql import Window

sc = SparkSession.builder.appName("example").\
config("spark.driver.memory","1g").\
config("spark.executor.cores",2).\
config("spark.max.cores",4).getOrCreate()



df = sc.read.format("csv").option("header","true").option("delimiter",",").load("storesales.csv")

w = Window.partitionBy("StoreID").orderBy("Date")


df = df.withColumn("oneprev",f.lag("Total_Sales",1).over(w)).withColumn("twoprev",f.lag("Total_Sales",2).over(w))



df = df.withColumn("isdeclining",f.when((df["Total_Sales"].cast("double") < df["oneprev"].cast("double")) & (df["oneprev"].cast("double") < df["twoprev"].cast("double")) ,"declining").otherwise("notdeclining"))
df = df.withColumn("oneprev_isdeclining",f.lag("isdeclining",1).over(w)).withColumn("twoprev_isdeclining",f.lag("isdeclining",2).over(w))

df = df.filter ((df ["isdeclining"] == "отклоняется") & (df ["oneprev_isdeclining"]! = "Отклоняется") & (df ["twoprev_isdeclining"]! = "отклонение ")). select ([" StoreID "," Date "," Total_Sales "])

df.show()

Вы можете объединить некоторые строки в одну строку, но в идеале оптимизатор sql spark должен позаботиться об этом

Sample input +-------+-----------+---------+
|StoreId|Total_Sales|     Date|
+-------+-----------+---------+
|     S1|      10000|01-Jan-18|
|     S1|      20000|02-Jan-18|
|     S1|      25000|03-Jan-18|
|     S1|      30000|04-Jan-18|
|     S1|      29000|05-Jan-18|
|     S1|      28500|06-Jan-18|
|     S1|      25500|07-Jan-18|
|     S1|      25500|08-Jan-18|
|     S1|      30000|09-Jan-18|
|     S1|      29000|10-Jan-18|
|     S1|      28000|11-Jan-18|
|     S1|      25000|12-Jan-18|
|     S1|      25000|13-Jan-18|
|     S1|      30000|14-Jan-18|
|     S1|      29000|15-Jan-18|
+-------+-----------+---------+    

Desired Output :

+-------+---------+-----------+
|StoreID|     Date|Total_Sales|
+-------+---------+-----------+
|     S1|06-Jan-18|      28500|
|     S1|11-Jan-18|      28000|
+-------+---------+-----------+
...