Комплекс Pivot-Unpivot в Spark scala - PullRequest
0 голосов
/ 02 августа 2020
• 1000 1007 *

Схема таблицы SalesPerson.

root
 |-- col: struct (nullable = false)
 |    |-- salesperson_4: string (nullable = true)
 |    |-- salesperson_10: string (nullable = true)
 |    |-- salesperson_11: string (nullable = true)
 |    |-- salesperson_21: string (nullable = true)

Данные таблицы продавца.

+--------------+--------------+--------------+--------------+
|salesperson_4 |salesperson_10|salesperson_11|salesperson_21|
+--------------+--------------+--------------+--------------+
| Customer_933 | Customer_1760| Customer_454 | Customer_127 |
|Customer_1297 |Customer_2411 |Customer_158  |Customer_2703 |
|Customer_861  |Customer_1550 |Customer_812 |Customer_2976 |

+ -------------- + - ------------- + -------------- + -------------- +

Мой фрейм данных salesType выглядит как

Схема salesType

root
 |-- Type: string (nullable = true)
 |-- Customer: string (nullable = true)

Data of salesType

|Type  |customer     |
+------+-------------+
|Online|Customer_933 |
|inshop|Customer_933|
|inshop|Customer_1297|
|Online|Customer_2411|
|Online|Customer_2411|
|Online|Customer_1550|
|Online|Customer_2976|
|Online|Customer_812 |
|Online|Customer_812 |
|inshop|Customer_127 |
+------+-------------+

Я пытаюсь проверить, какие все клиенты из таблицы Salesperson доступны в SalesType стол. с двумя additional column, which shows customer belong to specific salesperson и счетчиком появления клиентов в таблице SalesPlace. В основном все клиенты из таблицы продавца и они существуют в таблице SalesType

Expected Output:
+------+-------------++------+-------------++------+-------------+
    CustomerBelongstoSalesperson|Customer     |occurance|
    salesperson_4               |Customer_933 |2
    salesperson_10              |Customer_2411|2 
    salesperson_4               |Customer_1297|1
    salesperson_10              |Customer_1550|1
    SalesPerson_21              |Customer_2976|1
    SalesPerson_11              |Customer_812 |2
    SalesPerson_21              |Customer_127 |1
    salesperson_4               |Customer_861 |0
    salesperson_10              |Customer_1760|0
    SalesPerson_11              |Customer_454 |0
    SalesPerson_11              |Customer_158 |0
     SalesPerson_21             |Customer_2703|0
+------+-------------++------+-------------++------+-------------+  

Код:

val stringCol = df1.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")
    val processedDF = df1.selectExpr(s"stack(${df1.columns.length}, $stringCol) as (Salesperson, Customer)")
    processedDF.show(false)
    
processedDF.join(df2, Seq("Customer"), "left")
      .groupBy("Customer")
      .agg(count("Place").as("Occurance"), first("Salesperson").as("Salesperson"))
      .show(false)
  

Большое спасибо ...... Пожалуйста, поделитесь своим предложением enter image description here

введите описание изображения здесь

1 Ответ

1 голос
/ 03 августа 2020

это работает в Spark 2.4.0 +,

val sourceDF = Seq(
    ("Customer_933","Customer_1760","Customer_454","Customer_127"),
    ("Customer_1297","Customer_2411","Customer_158","Customer_2703"),
    ("Customer_861","Customer_1550","Customer_812","Customer_2976")
).toDF("salesperson_4","salesperson_10","salesperson_11","salesperson_21")
sourceDF.show()

/*
+-------------+--------------+--------------+--------------+
|salesperson_4|salesperson_10|salesperson_11|salesperson_21|
+-------------+--------------+--------------+--------------+
| Customer_933| Customer_1760|  Customer_454|  Customer_127|
|Customer_1297| Customer_2411|  Customer_158| Customer_2703|
| Customer_861| Customer_1550|  Customer_812| Customer_2976|
+-------------+--------------+--------------+--------------+
*/

val salesDF=
Seq(
("Online","Customer_933"),
("inshop","Customer_933"),
("inshop","Customer_1297"),
("Online","Customer_2411"),
("Online","Customer_2411"),
("Online","Customer_1550"),
("Online","Customer_2976"),
("Online","Customer_812"),
("Online","Customer_812"),
("inshop","Customer_127")).toDF("Type","Customer")

salesDF.show()

/*
+------+-------------+
|  Type|     Customer|
+------+-------------+
|Online| Customer_933|
|inshop| Customer_933|
|inshop|Customer_1297|
|Online|Customer_2411|
|Online|Customer_2411|
|Online|Customer_1550|
|Online|Customer_2976|
|Online| Customer_812|
|Online| Customer_812|
|inshop| Customer_127|
+------+-------------+
*/

val stringCol = sourceDF.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")
val processedDF = sourceDF.selectExpr(s"stack(${sourceDF.columns.length}, $stringCol) as (Salesperson, Customer)")
processedDF.show(false)

/*
+--------------+-------------+
|Salesperson   |Customer     |
+--------------+-------------+
|salesperson_4 |Customer_933 |
|salesperson_10|Customer_1760|
|salesperson_11|Customer_454 |
|salesperson_21|Customer_127 |
|salesperson_4 |Customer_1297|
|salesperson_10|Customer_2411|
|salesperson_11|Customer_158 |
|salesperson_21|Customer_2703|
|salesperson_4 |Customer_861 |
|salesperson_10|Customer_1550|
|salesperson_11|Customer_812 |
|salesperson_21|Customer_2976|
+--------------+-------------+
*/


processedDF.join(salesDF, Seq("Customer"), "left").groupBy("Customer").agg(count("Type").as("Occurance"), first("Salesperson").as("Salesperson")).show(false)

/*
+-------------+---------+--------------+
|Customer     |Occurance|Salesperson   |
+-------------+---------+--------------+
|Customer_2411|2        |salesperson_10|
|Customer_158 |0        |salesperson_11|
|Customer_812 |2        |salesperson_11|
|Customer_1760|0        |salesperson_10|
|Customer_2703|0        |salesperson_21|
|Customer_861 |0        |salesperson_4 |
|Customer_127 |1        |salesperson_21|
|Customer_2976|1        |salesperson_21|
|Customer_1297|1        |salesperson_4 |
|Customer_454 |0        |salesperson_11|
|Customer_933 |2        |salesperson_4 |
|Customer_1550|1        |salesperson_10|
+-------------+---------+--------------+

*/
...