pyspark заменяет нулевые значения некоторым вычислением, связанным с последними ненулевыми значениями - PullRequest
2 голосов
/ 13 марта 2020

Привет, мой вопрос несколько связан с этим ( Заполните null ранее известным хорошим значением pyspark ), но в моей задаче есть небольшое изменение требования:

   data:                                        expected output:       
   +------+-----+---------+---------+-----+     +------+-----+---------+---------+-----+
   |  item|store|timestamp|sales_qty|stock|     |  item|store|timestamp|sales_qty|stock|
   +------+-----+---------+---------+-----+     +------+-----+---------+---------+-----+
   |673895|35578| 20180101|        1| null|     |673895|35578| 20180101|        1| null|
   |673895|35578| 20180102|        0|  110|     |673895|35578| 20180102|        0|  110|
   |673895|35578| 20180103|        1| null|     |673895|35578| 20180103|        1|  109|
   |673895|35578| 20180104|        0| null|     |673895|35578| 20180104|        0|  109|
   |673895|35578| 20180105|        0|  109|  => |673895|35578| 20180105|        0|  109|
   |673895|35578| 20180106|        1| null|     |673895|35578| 20180106|        1|  108|
   |673895|35578| 20180107|        0|  108|     |673895|35578| 20180107|        0|  108|
   |673895|35578| 20180108|        0| null|     |673895|35578| 20180108|        0|  108|
   |673895|35578| 20180109|        0| null|     |673895|35578| 20180109|        0|  108|
   |673895|35578| 20180110|        1| null|     |673895|35578| 20180110|        1|  107|
   +------+-----+---------+---------+-----+     +------+-----+---------+---------+-----+

мой ожидаемый результат основан на последнем известном ненулевом значении и sales_qty, если есть sales_qty, тогда стоимость акции должна быть скорректирована в соответствии с этим. Я попробовал следующую логику c

        my_window = Window.partitionBy('item','store').orderBy('timestamp')
        df = df.withColumn("stock", F.when((F.isnull(F.col('stock'))),F.lag(df.stock).over(my_window)-F.col('sales_qty')).otherwise(F.col('stock')))

, но она работает только для одного нулевого значения. Может ли кто-нибудь помочь мне достичь ожидаемого результата?

Примечание: количество НЕ всегда в непрерывном уменьшении, поэтому необходимо учитывать последнее ненулевое значение для расчета нового

1 Ответ

3 голосов
/ 15 марта 2020

Вы можете попробовать это. Я в основном генерирую два столбца: сначала (сначала ненулевое значение = 110) и stock2, который в основном представляет собой инкрементную сумму акций, а затем вычитает их друг из друга, чтобы получить желаемый запас.

from pyspark.sql.window import Window
from pyspark.sql import functions as F
w=Window().partitionBy("item","store").orderBy("timestamp")
w2=Window().partitionBy("item","store").orderBy("timestamp").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock2", F.sum("sales_qty").over(w)- F.lit(1))\
.withColumn("first", F.first("stock", True).over(w2))\
.withColumn("stock", F.col("first")-F.col("stock2"))\
.drop("stock1","stock2","first")\
.show()

+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1|  110|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1|  109|
|673895|35578| 20180104|        0|  109|
|673895|35578| 20180105|        0|  109|
|673895|35578| 20180106|        1|  108|
|673895|35578| 20180107|        0|  108|
|673895|35578| 20180108|        0|  108|
|673895|35578| 20180109|        0|  108|
|673895|35578| 20180110|        1|  107|
+------+-----+---------+---------+-----+

Если вы хотите, чтобы ваше первое значение равнялось нулю вместо 110 (как показано в вашем желаемом выводе), вы можете использовать это (в основном использует rownumber для замены этого первого значения 110 на null):

from pyspark.sql.window import Window
from pyspark.sql import functions as F
w=Window().partitionBy("item","store").orderBy("timestamp")
w2=Window().partitionBy("item","store").orderBy("timestamp").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock2", F.sum("sales_qty").over(w)- F.lit(1))\
.withColumn("first", F.first("stock", True).over(w2))\
.withColumn("stock", F.col("first")-F.col("stock2"))\
.withColumn("num", F.row_number().over(w))\
.withColumn("stock", F.when(F.col("num")==1, F.lit(None)).otherwise(F.col("stock")))\
.drop("stock1","stock2","first","num")\
.show()


+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1| null|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1|  109|
|673895|35578| 20180104|        0|  109|
|673895|35578| 20180105|        0|  109|
|673895|35578| 20180106|        1|  108|
|673895|35578| 20180107|        0|  108|
|673895|35578| 20180108|        0|  108|
|673895|35578| 20180109|        0|  108|
|673895|35578| 20180110|        1|  107|
+------+-----+---------+---------+-----+

Дополнительные данные ВХОД и ВЫХОД:

#input1
+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1| null|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1| null|
|673895|35578| 20180104|        3| null|
|673895|35578| 20180105|        0|  109|
|673895|35578| 20180106|        1| null|
|673895|35578| 20180107|        0|  108|
|673895|35578| 20180108|        4| null|
|673895|35578| 20180109|        0| null|
|673895|35578| 20180110|        1| null|
+------+-----+---------+---------+-----+

#output1
+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1| null|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1|  109|
|673895|35578| 20180104|        3|  106|
|673895|35578| 20180105|        0|  106|
|673895|35578| 20180106|        1|  105|
|673895|35578| 20180107|        0|  105|
|673895|35578| 20180108|        4|  101|
|673895|35578| 20180109|        0|  101|
|673895|35578| 20180110|        1|  100|
+------+-----+---------+---------+-----+


#input2
+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1| null|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1| null|
|673895|35578| 20180104|        7| null|
|673895|35578| 20180105|        0|  102|
|673895|35578| 20180106|        0| null|
|673895|35578| 20180107|        4|   98|
|673895|35578| 20180108|        0| null|
|673895|35578| 20180109|        0| null|
|673895|35578| 20180110|        1| null|
+------+-----+---------+---------+-----+

#output2
+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1| null|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1|  109|
|673895|35578| 20180104|        7|  102|
|673895|35578| 20180105|        0|  102|
|673895|35578| 20180106|        0|  102|
|673895|35578| 20180107|        4|   98|
|673895|35578| 20180108|        0|   98|
|673895|35578| 20180109|        0|   98|
|673895|35578| 20180110|        1|   97|
+------+-----+---------+---------+-----+

ЕСЛИ , значения stock НЕ являются непрерывными вот так:

  df.show()

+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1| null|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1| null|
|673895|35578| 20180104|        7| null|
|673895|35578| 20180105|        0|  112|
|673895|35578| 20180106|        2| null|
|673895|35578| 20180107|        0|  107|
|673895|35578| 20180108|        0| null|
|673895|35578| 20180109|        0| null|
|673895|35578| 20180110|        1| null|
+------+-----+---------+---------+-----+

Вы могли бы использовать это :( я в основном вычисляю динамическое c окно для каждого непустого последнего)

from pyspark.sql.window import Window
from pyspark.sql import functions as F


w=Window().partitionBy("item","store").orderBy("timestamp")
w3=Window().partitionBy("item","store","stock5").orderBy("timestamp")
df.withColumn("stock1", F.when(F.col("stock").isNull(), F.lit(0)).otherwise(F.col("stock")))\
.withColumn("stock4", F.when(F.col("stock1")!=0, F.rank().over(w)).otherwise(F.col("stock1")))\
.withColumn("stock5", F.sum("stock4").over(w))\
.withColumn("stock6", F.sum("stock1").over(w3))\
.withColumn("sum", F.sum(F.when(F.col("stock1")!=F.col("stock6"),F.col("sales_qty")).otherwise(F.lit(0))).over(w3))\
.withColumn("stock2", F.when(F.col("sales_qty")!=0, F.col("stock6")-F.col("sum")).otherwise(F.col("stock")))\
.withColumn("stock", F.when((F.col("stock2").isNull())&(F.col("sales_qty")==0),F.col("stock6")-F.col("sum")).otherwise(F.col("stock2")))\
.drop("stock1","stock4","stock5","stock6","sum","stock2")\
.show()

+------+-----+---------+---------+-----+
|  item|store|timestamp|sales_qty|stock|
+------+-----+---------+---------+-----+
|673895|35578| 20180101|        1|    0|
|673895|35578| 20180102|        0|  110|
|673895|35578| 20180103|        1|  109|
|673895|35578| 20180104|        7|  102|
|673895|35578| 20180105|        0|  112|
|673895|35578| 20180106|        2|  110|
|673895|35578| 20180107|        0|  107|
|673895|35578| 20180108|        0|  107|
|673895|35578| 20180109|        0|  107|
|673895|35578| 20180110|        1|  106|
+------+-----+---------+---------+-----+
...