Применяя сложную UDF для группы записей, я думаю, что UDF необходимы для решения этой проблемы. - PullRequest
0 голосов
/ 21 мая 2019

Я должен найти, что когда конкретный магазин меняет свой бренд, мне нужно заполнить mthid. Это должно применяться к каждому магазину.

+------+-----------+---------------+-------------+-------------+
|MTH_ID| store_id  |     brand     |    brndSales|   TotalSales|
+------+-----------+---------------+-------------+-------------+
|201801|      10941|            115|  80890.44900| 135799.66400|
|201712|      10941|            123| 517440.74500| 975893.79000|
|201711|      10941|            99 | 371501.92100| 574223.52300|
|201710|      10941|            115| 552435.57800| 746912.06700|
|201709|      10941|            115|1523492.60700|1871480.06800|
|201708|      10941|            115|1027698.93600|1236544.50900|
|201707|      10941|            33 |1469219.86900|1622949.53000|

Вывод выглядит следующим образом

+------+-----------+---------------+-------------+-------------+
|MTH_ID| store_id  |     brand     |    brndSales|   TotalSales|switchdate
+------+-----------+---------------+-------------+-------------+
|201801|      10941|            115|  80890.44900| 135799.66400| 201712
|201712|      10941|            123| 517440.74500| 975893.79000| 201711
|201711|      10941|            99 | 371501.92100| 574223.52300| 201710
|201710|      10941|            115| 552435.57800| 746912.06700| 201707
|201709|      10941|            115|1523492.60700|1871480.06800| 201707
|201708|      10941|            115|1027698.93600|1236544.50900| 201707
|201707|      10941|            33 |1469219.86900|1622949.53000| 201706

Я думал о применении задержки, но нам нужно проверить, есть ли изменения в колонке бренда. Если нет изменения в бренде, мы должны заполнить его, когда оно изменилось в последний раз.

Входные данные

val data = Seq((201801,      10941,            115,  80890.44900, 135799.66400),(201712,      10941,            123, 517440.74500, 975893.79000),(201711,      10941,            99 , 371501.92100, 574223.52300),(201710,      10941,            115, 552435.57800, 746912.06700),(201709,      10941,            115,1523492.60700,1871480.06800),(201708,      10941,            115,1027698.93600,1236544.50900),(201707,      10941,            33 ,1469219.86900,1622949.53000)).toDF("MTH_ID", "store_id" ,"brand" ,"brndSales","TotalSales")

Вывод из ответа

+------+--------+-----+-----------+-----------+---------------+---+----------+
|MTH_ID|store_id|brand|  brndSales| TotalSales|prev_brand_flag|grp|switchdate|
+------+--------+-----+-----------+-----------+---------------+---+----------+
|201801|   10941|  115|  80890.449| 135799.664|              1|  5|    201801|
|201712|   10941|  123| 517440.745|  975893.79|              1|  4|    201712|
|201711|   10941|   99| 371501.921| 574223.523|              1|  3|    201711|
|201710|   10941|  115| 552435.578| 746912.067|              0|  2|    201708|
|201709|   10941|  115|1523492.607|1871480.068|              0|  2|    201708|
|201708|   10941|  115|1027698.936|1236544.509|              1|  2|    201708|
|201707|   10941|   33|1469219.869| 1622949.53|              1|  1|    201707|
+------+--------+-----+-----------+-----------+---------------+---+----------+

Должны ли какие-либо доступные функции удовлетворять цели

1 Ответ

1 голос
/ 22 мая 2019

Решение PySpark.

Используйте lag с запущенным sum, чтобы проверить, изменилось ли значение по сравнению с предыдущей строкой, и, если это так, увеличить счетчик, чтобы установить группы.Как только группировка завершена, речь идет о получении min даты для каждой группы.

w1 = Window.partitionBy(df.store_id).orderBy(df.mth_id)
df = df.withColumn('prev_brand_flag',when(lag(df.brand).over(w1) == df.brand,0).otherwise(1))
df = df.withColumn('grp',sum(df.prev_brand_flag).over(w1))
w2 = Window.partitionBy(df.store_id,df.grp)
res = df.withColumn('switchdate',min(df.mth_id).over(w2))
res.show()

Просмотр результатов промежуточных фреймов данных даст вам представление о том, как работает логика.

...