Оконная функция Разблокируйте связь с другим полем, чтобы получить последнюю запись - PullRequest
5 голосов
/ 11 июня 2019

У меня есть следующие данные, где данные разделены по магазинам и идентификатору месяца и упорядочены по количеству, чтобы получить основного поставщика для магазина.

Мне нужен прерыватель связи, если сумма равнамежду двумя поставщиками, то, если один из связанных поставщиков в предыдущие месяцы был самым продаваемым, сделайте этого продавца самым продаваемым за месяц.

Если снова возникнет связь, оглядка назад увеличится.Задержка в 1 месяц не сработает, если снова будет ничья.В худшем случае у нас будет больше дубликатов в предыдущем месяце.

пример данных

val data = Seq((201801,      10941,            115,  80890.44900, 135799.66400),
               (201801,      10941,            3,  80890.44900, 135799.66400) ,
               (201712,      10941,            3, 517440.74500, 975893.79000),
               (201712,      10941,            115, 517440.74500, 975893.79000),
               (201711,      10941,            3 , 371501.92100, 574223.52300),
               (201710,      10941,            115, 552435.57800, 746912.06700),
               (201709,      10941,            115,1523492.60700,1871480.06800),
               (201708,      10941,            115,1027698.93600,1236544.50900),
               (201707,      10941,            33 ,1469219.86900,1622949.53000)
               ).toDF("MTH_ID", "store_id" ,"brand" ,"brndSales","TotalSales")

Код:

val window = Window.partitionBy("store_id","MTH_ID").orderBy("brndSales")
val res = data.withColumn("rank",rank over window)

Вывод:

    +------+--------+-----+-----------+-----------+----+
 |MTH_ID|store_id|brand|  brndSales| TotalSales|rank|
+------+--------+-----+-----------+-----------+----+
|201801|   10941|  115|  80890.449| 135799.664|   1|
|201801|   10941|    3|  80890.449| 135799.664|   1|
|201712|   10941|    3| 517440.745|  975893.79|   1|
|201712|   10941|  115| 517440.745|  975893.79|   1|
|201711|   10941|  115| 371501.921| 574223.523|   1|
|201710|   10941|  115| 552435.578| 746912.067|   1|
|201709|   10941|  115|1523492.607|1871480.068|   1|
|201708|   10941|  115|1027698.936|1236544.509|   1|
|201707|   10941|   33|1469219.869| 1622949.53|   1|
+------+--------+-----+-----------+-----------+----+

Мой ранг 1 для 1 и 2 записей, но мой ранг должен быть 1 для второй записи на основе максимальных долларов за предыдущий месяц

Я ожидаю следующий вывод.

    +------+--------+-----+-----------+-----------+----+
    |MTH_ID|store_id|brand|  brndSales| TotalSales|rank|
    +------+--------+-----+-----------+-----------+----+
    |201801|   10941|  115|  80890.449| 135799.664|   2|
    |201801|   10941|    3|  80890.449| 135799.664|   1|
    |201712|   10941|    3| 517440.745|  975893.79|   1|
    |201712|   10941|  115| 517440.745|  975893.79|   1|
    |201711|   10941|    3| 371501.921| 574223.523|   1|
    |201710|   10941|  115| 552435.578| 746912.067|   1|
    |201709|   10941|  115|1523492.607|1871480.068|   1|
    |201708|   10941|  115|1027698.936|1236544.509|   1|
    |201707|   10941|   33|1469219.869| 1622949.53|   1|
    +------+--------+-----+-----------+-----------+----+

Должен ли я написать UDAF?Любые предложения помогут.

Ответы [ 2 ]

3 голосов
/ 13 июня 2019

Вы можете сделать это с 2 окнами. Во-первых, вам нужно будет использовать функцию lag () для переноса значений продаж за предыдущий месяц, чтобы вы могли использовать их в своем окне рейтинга. вот эта часть в pyspark:

lag_window = Window.partitionBy("store_id", "brand").orderBy("MTH_ID")
lag_df = data.withColumn("last_month_sales", lag("brndSales").over(lag_window))

Затем отредактируйте ваше окно, добавив в него новый столбец:

window = Window.partitionBy("store_id","MTH_ID").orderBy("brndSales", "last_month_sales")
lag_df.withColumn("rank",rank().over(window)).show()
+------+--------+-----+-----------+-----------+----------------+----+
|MTH_ID|store_id|brand|  brndSales| TotalSales|last_month_sales|rank|
+------+--------+-----+-----------+-----------+----------------+----+
|201711|   10941|   99| 371501.921| 574223.523|            null|   1|
|201709|   10941|  115|1523492.607|1871480.068|     1027698.936|   1|
|201707|   10941|   33|1469219.869| 1622949.53|            null|   1|
|201708|   10941|  115|1027698.936|1236544.509|            null|   1|
|201710|   10941|  115| 552435.578| 746912.067|     1523492.607|   1|
|201712|   10941|    3| 517440.745|  975893.79|            null|   1|
|201801|   10941|    3|  80890.449| 135799.664|      517440.745|   1|
|201801|   10941|  115|  80890.449| 135799.664|      552435.578|   2|
+------+--------+-----+-----------+-----------+----------------+----+
0 голосов
/ 18 июня 2019

Для каждой строки соберите массив предыдущих продаж этого бренда в структуре (Месяц, Продажи).

val storeAndBrandWindow = Window.partitionBy("store_id", "brand").orderBy($"MTH_ID")
val df1 = data.withColumn("brndSales_list", collect_list(struct($"MTH_ID", $"brndSales")).over(storeAndBrandWindow))

Обратный массив с UDF.

val returnType = ArrayType(StructType(Array(StructField("month", IntegerType), StructField("sales", DoubleType))))
val reverseUdf = udf((list: Seq[Row]) => list.reverse, returnType)
val df2 = df1.withColumn("brndSales_list", reverseUdf($"brndSales_list"))

А затем сортировать по массиву.

val window = Window.partitionBy("store_id", "MTH_ID").orderBy($"brndSales_list".desc)
val df3 = df2.withColumn("rank", rank over window).orderBy("MTH_ID", "brand")
df3.show(false)

Результат

+------+--------+-----+-----------+-----------+-----------------------------------------------------------------------------------------+----+
|MTH_ID|store_id|brand|brndSales  |TotalSales |brndSales_list                                                                           |rank|
+------+--------+-----+-----------+-----------+-----------------------------------------------------------------------------------------+----+
|201707|10941   |33   |1469219.869|1622949.53 |[[201707, 1469219.869]]                                                                  |1   |
|201708|10941   |115  |1027698.936|1236544.509|[[201708, 1027698.936]]                                                                  |1   |
|201709|10941   |115  |1523492.607|1871480.068|[[201709, 1523492.607], [201708, 1027698.936]]                                           |1   |
|201710|10941   |115  |552435.578 |746912.067 |[[201710, 552435.578], [201709, 1523492.607], [201708, 1027698.936]]                     |1   |
|201711|10941   |99   |371501.921 |574223.523 |[[201711, 371501.921]]                                                                   |1   |
|201712|10941   |3    |517440.745 |975893.79  |[[201712, 517440.745]]                                                                   |1   |
|201801|10941   |3    |80890.449  |135799.664 |[[201801, 80890.449], [201712, 517440.745]]                                              |1   |
|201801|10941   |115  |80890.449  |135799.664 |[[201801, 80890.449], [201710, 552435.578], [201709, 1523492.607], [201708, 1027698.936]]|2   |
+------+--------+-----+-----------+-----------+-----------------------------------------------------------------------------------------+----+
...