Вам не нужно stack
здесь -
- читать текстовый файл с помощью разделителя
:
split
и explode
клиенты Left Join
с помощью df2 - Вычислить вхождение
Загрузить данные 1. Прочтите текстовый файл с помощью разделителя :
val data1 =
"""
|Salesperson_21: Customer_575,Customer_2703,Customer_2682,Customer_2615
|Salesperson_11: Customer_454,Customer_158,Customer_1859,Customer_2605
|Salesperson_10: Customer_1760,Customer_613,Customer_3008,Customer_1265
|Salesperson_4: Customer_1545,Customer_1312,Customer_861,Customer_2178
""".stripMargin
// val stringDS1 = data1.split(System.lineSeparator())
// .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
// .toSeq.toDS()
val df1 = spark.read.schema("Salesperson STRING, Customer STRING")
.option("sep", ":")
.csv(data1.split(System.lineSeparator()).toSeq.toDS())
df1.show(false)
df1.printSchema()
/**
* +--------------+-------------------------------------------------------+
* |Salesperson |Customer |
* +--------------+-------------------------------------------------------+
* |Salesperson_21| Customer_575,Customer_2703,Customer_2682,Customer_2615|
* |Salesperson_11| Customer_454,Customer_158,Customer_1859,Customer_2605 |
* |Salesperson_10| Customer_1760,Customer_613,Customer_3008,Customer_1265|
* |Salesperson_4 | Customer_1545,Customer_1312,Customer_861,Customer_2178|
* +--------------+-------------------------------------------------------+
*
* root
* |-- Salesperson: string (nullable = true)
* |-- Customer: string (nullable = true)
*/
val data2 =
"""
|Type |Customer
|shop |Customer_17
|Home |Customer_2703
|shop |Customer_2703
|Home |Customer_575
|Shop |Customer_202
""".stripMargin
val stringDS2 = data2.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df2 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS2)
df2.show(false)
df2.printSchema()
/**
* +----+-------------+
* |Type|Customer |
* +----+-------------+
* |shop|Customer_17 |
* |Home|Customer_2703|
* |shop|Customer_2703|
* |Home|Customer_575 |
* |Shop|Customer_202 |
* +----+-------------+
*
* root
* |-- Type: string (nullable = true)
* |-- Customer: string (nullable = true)
*/
2. split
и explode
клиенты
val processedDF = df1.withColumn("Customer", explode(split(trim(col("Customer")), ",")))
processedDF.show(false)
/**
* +--------------+-------------+
* |Salesperson |Customer |
* +--------------+-------------+
* |Salesperson_21|Customer_575 |
* |Salesperson_21|Customer_2703|
* |Salesperson_21|Customer_2682|
* |Salesperson_21|Customer_2615|
* |Salesperson_11|Customer_454 |
* |Salesperson_11|Customer_158 |
* |Salesperson_11|Customer_1859|
* |Salesperson_11|Customer_2605|
* |Salesperson_10|Customer_1760|
* |Salesperson_10|Customer_613 |
* |Salesperson_10|Customer_3008|
* |Salesperson_10|Customer_1265|
* |Salesperson_4 |Customer_1545|
* |Salesperson_4 |Customer_1312|
* |Salesperson_4 |Customer_861 |
* |Salesperson_4 |Customer_2178|
* +--------------+-------------+
*/
3. Left Join с df2 & 4. Вычислить вхождение
processedDF.join(df2, Seq("Customer"), "left")
.groupBy("Customer")
.agg(count("Type").as("Occurance"), first("Salesperson").as("Salesperson"))
.show(false)
/**
* +-------------+---------+--------------+
* |Customer |Occurance|Salesperson |
* +-------------+---------+--------------+
* |Customer_1312|0 |Salesperson_4 |
* |Customer_1545|0 |Salesperson_4 |
* |Customer_1760|0 |Salesperson_10|
* |Customer_2682|0 |Salesperson_21|
* |Customer_2703|2 |Salesperson_21|
* |Customer_3008|0 |Salesperson_10|
* |Customer_454 |0 |Salesperson_11|
* |Customer_613 |0 |Salesperson_10|
* |Customer_1265|0 |Salesperson_10|
* |Customer_158 |0 |Salesperson_11|
* |Customer_1859|0 |Salesperson_11|
* |Customer_2178|0 |Salesperson_4 |
* |Customer_2605|0 |Salesperson_11|
* |Customer_2615|0 |Salesperson_21|
* |Customer_575 |1 |Salesperson_21|
* |Customer_861 |0 |Salesperson_4 |
* +-------------+---------+--------------+
*/